This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new a640579a55a [improve][offload] Coalesce automatic offload triggers to
reduce retry loops and ledger scans (#25793)
a640579a55a is described below
commit a640579a55ab64c006aaeee0e17d789c3ff9d823
Author: void-ptr974 <[email protected]>
AuthorDate: Thu Jun 4 02:16:26 2026 +0800
[improve][offload] Coalesce automatic offload triggers to reduce retry
loops and ledger scans (#25793)
(cherry picked from commit 7ecedb8265b1d802ef1a571d7780f4a73141251b)
---
.../impl/AutomaticOffloadTriggerController.java | 86 ++++++++++++++
.../mledger/impl/ManagedLedgerFactoryImpl.java | 4 +-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 90 +++++++++++++--
.../AutomaticOffloadTriggerControllerTest.java | 85 ++++++++++++++
.../bookkeeper/mledger/impl/OffloadPrefixTest.java | 124 +++++++++++++++++++++
5 files changed, 375 insertions(+), 14 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerController.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerController.java
new file mode 100644
index 00000000000..35f5a267063
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerController.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Coalesces repeated automatic offload triggers into at most one active run
and one follow-up run.
+ */
+final class AutomaticOffloadTriggerController {
+ private static final int IDLE = 0;
+ private static final int RUNNING = 1;
+ private static final int RUNNING_WITH_PENDING_TRIGGER = 2;
+
+ private final AtomicInteger state = new AtomicInteger(IDLE);
+
+ /**
+ * Records an automatic offload trigger.
+ *
+ * @return true when the caller must start a new automatic offload run
+ */
+ boolean requestRun() {
+ while (true) {
+ int current = state.get();
+ switch (current) {
+ case IDLE:
+ if (state.compareAndSet(IDLE, RUNNING)) {
+ return true;
+ }
+ break;
+ case RUNNING:
+ if (state.compareAndSet(RUNNING,
RUNNING_WITH_PENDING_TRIGGER)) {
+ return false;
+ }
+ break;
+ case RUNNING_WITH_PENDING_TRIGGER:
+ return false;
+ default:
+ throw new IllegalStateException("Unknown automatic offload
trigger state: " + current);
+ }
+ }
+ }
+
+ /**
+ * Records completion of the current automatic offload run.
+ *
+ * @return true when the caller must immediately start one coalesced
follow-up run
+ */
+ boolean completeRun() {
+ while (true) {
+ int current = state.get();
+ switch (current) {
+ case IDLE:
+ return false;
+ case RUNNING:
+ if (state.compareAndSet(RUNNING, IDLE)) {
+ return false;
+ }
+ break;
+ case RUNNING_WITH_PENDING_TRIGGER:
+ if (state.compareAndSet(RUNNING_WITH_PENDING_TRIGGER,
RUNNING)) {
+ return true;
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unknown automatic offload
trigger state: " + current);
+ }
+ }
+ }
+}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 4e530a185cc..a719d484beb 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -19,7 +19,7 @@
package org.apache.bookkeeper.mledger.impl;
import static
org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
-import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE;
+import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicates;
@@ -487,7 +487,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
future.complete(newledger);
// May need to trigger offloading
if (config.isTriggerOffloadOnTopicLoad()) {
-
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+
newledger.maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER);
}
});
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 54be0723d47..b1a7e199740 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -224,8 +224,20 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
protected final CallbackMutex trimmerMutex = new CallbackMutex();
protected final CallbackMutex offloadMutex = new CallbackMutex();
- public static final CompletableFuture<Position> NULL_OFFLOAD_PROMISE =
CompletableFuture
+ private final AutomaticOffloadTriggerController
automaticOffloadTriggerController =
+ new AutomaticOffloadTriggerController();
+ // Identity sentinel for automatic offload requests. The completed
Position value is not used.
+ public static final CompletableFuture<Position> AUTOMATIC_OFFLOAD_TRIGGER
= CompletableFuture
.completedFuture(PositionFactory.LATEST);
+
+ private enum OffloadRequestSource {
+ AUTOMATIC,
+ EXPLICIT
+ }
+
+ private record OffloadThresholds(long thresholdInBytes, long
thresholdInSeconds) {
+ }
+
@VisibleForTesting
@Getter
protected volatile LedgerHandle currentLedger;
@@ -1970,7 +1982,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
trimConsumedLedgersInBackground();
- maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+ maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER);
createLedgerAfterClosed();
}
@@ -2803,22 +2815,73 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
public void maybeOffloadInBackground(CompletableFuture<Position> promise) {
- if (getOffloadPoliciesIfAppendable().isEmpty()) {
+ if (promise == AUTOMATIC_OFFLOAD_TRIGGER) {
+ if (automaticOffloadTriggerController.requestRun()) {
+ startAutomaticOffload();
+ }
+ return;
+ }
+
+ maybeOffloadInBackground(promise, OffloadRequestSource.EXPLICIT);
+ }
+
+ private void startAutomaticOffload() {
+ CompletableFuture<Position> automaticOffloadCompletion = new
CompletableFuture<>();
+ automaticOffloadCompletion.whenComplete((res, ex) ->
finishAutomaticOffload(ex));
+ try {
+ maybeOffloadInBackground(automaticOffloadCompletion,
OffloadRequestSource.AUTOMATIC);
+ } catch (RuntimeException e) {
+ automaticOffloadCompletion.completeExceptionally(e);
+ }
+ }
+
+ private void maybeOffloadInBackground(CompletableFuture<Position> promise,
OffloadRequestSource source) {
+ Optional<OffloadThresholds> offloadThresholds = getOffloadThresholds();
+ if (offloadThresholds.isEmpty()) {
+ if (source == OffloadRequestSource.AUTOMATIC) {
+ promise.complete(PositionFactory.LATEST);
+ }
return;
}
- final OffloadPolicies policies =
config.getLedgerOffloader().getOffloadPolicies();
+ OffloadThresholds thresholds = offloadThresholds.get();
+ try {
+ executor.execute(() -> maybeOffload(thresholds.thresholdInBytes(),
thresholds.thresholdInSeconds(),
+ promise, source));
+ } catch (RuntimeException e) {
+ promise.completeExceptionally(e);
+ }
+ }
+
+ private Optional<OffloadThresholds> getOffloadThresholds() {
+ Optional<OffloadPolicies> optionalOffloadPolicies =
getOffloadPoliciesIfAppendable();
+ if (optionalOffloadPolicies.isEmpty()) {
+ return Optional.empty();
+ }
+
+ final OffloadPolicies policies = optionalOffloadPolicies.get();
final long offloadThresholdInBytes =
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L);
final long offloadThresholdInSeconds =
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
- executor.execute(() -> maybeOffload(offloadThresholdInBytes,
offloadThresholdInSeconds, promise));
+ return Optional.of(new OffloadThresholds(offloadThresholdInBytes,
offloadThresholdInSeconds));
+ }
+
+ return Optional.empty();
+ }
+
+ private void finishAutomaticOffload(Throwable exception) {
+ if (exception != null && log.isDebugEnabled()) {
+ log.debug("Failed to automatically offload ledgers", exception);
+ }
+ if (automaticOffloadTriggerController.completeRun()) {
+ startAutomaticOffload();
}
}
private void maybeOffload(long offloadThresholdInBytes, long
offloadThresholdInSeconds,
- CompletableFuture<Position> finalPromise) {
+ CompletableFuture<Position> finalPromise,
OffloadRequestSource source) {
if (getOffloadPoliciesIfAppendable().isEmpty()) {
String msg = String.format("[%s] Nothing to offload due to
offloader or offloadPolicies is NULL", name);
finalPromise.completeExceptionally(new
IllegalArgumentException(msg));
@@ -2833,8 +2896,12 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
if (!offloadMutex.tryLock()) {
- scheduledExecutor.schedule(() ->
maybeOffloadInBackground(finalPromise),
- 100, TimeUnit.MILLISECONDS);
+ try {
+ scheduledExecutor.schedule(() ->
maybeOffloadInBackground(finalPromise, source),
+ 100, TimeUnit.MILLISECONDS);
+ } catch (RuntimeException e) {
+ finalPromise.completeExceptionally(e);
+ }
return;
}
@@ -2924,12 +2991,11 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
private Optional<OffloadPolicies> getOffloadPoliciesIfAppendable() {
LedgerOffloader ledgerOffloader = config.getLedgerOffloader();
- if (ledgerOffloader == null
- || !ledgerOffloader.isAppendable()
- || ledgerOffloader.getOffloadPolicies() == null) {
+ if (ledgerOffloader == null || !ledgerOffloader.isAppendable()) {
return Optional.empty();
}
- return Optional.ofNullable(ledgerOffloader.getOffloadPolicies());
+ OffloadPolicies offloadPolicies = ledgerOffloader.getOffloadPolicies();
+ return Optional.ofNullable(offloadPolicies);
}
@VisibleForTesting
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerControllerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerControllerTest.java
new file mode 100644
index 00000000000..9aefc7e3e1e
--- /dev/null
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerControllerTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.testng.annotations.Test;
+
+public class AutomaticOffloadTriggerControllerTest {
+
+ @Test
+ public void triggersCoalesceWhileRunIsActive() {
+ AutomaticOffloadTriggerController controller = new
AutomaticOffloadTriggerController();
+
+ assertThat(controller.requestRun()).isTrue();
+ assertThat(controller.requestRun()).isFalse();
+ assertThat(controller.requestRun()).isFalse();
+ }
+
+ @Test
+ public void pendingTriggerSchedulesOneFollowUpRun() {
+ AutomaticOffloadTriggerController controller = new
AutomaticOffloadTriggerController();
+
+ assertThat(controller.requestRun()).isTrue();
+ assertThat(controller.requestRun()).isFalse();
+
+ assertThat(controller.completeRun()).isTrue();
+ assertThat(controller.completeRun()).isFalse();
+ assertThat(controller.requestRun()).isTrue();
+ }
+
+ @Test(timeOut = 30000)
+ public void concurrentTriggerAndCompletionAlwaysReserveOneFollowUpRun()
throws Exception {
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ for (int i = 0; i < 1000; i++) {
+ AutomaticOffloadTriggerController controller = new
AutomaticOffloadTriggerController();
+ assertThat(controller.requestRun()).isTrue();
+
+ // Completion and a new trigger can race; exactly one side
must reserve the follow-up run.
+ CyclicBarrier barrier = new CyclicBarrier(3);
+ Future<Boolean> completeResult = executor.submit(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ return controller.completeRun();
+ });
+ Future<Boolean> triggerResult = executor.submit(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ return controller.requestRun();
+ });
+
+ barrier.await(5, TimeUnit.SECONDS);
+ boolean followUpReservedByComplete = completeResult.get(5,
TimeUnit.SECONDS);
+ boolean followUpReservedByTrigger = triggerResult.get(5,
TimeUnit.SECONDS);
+
+ assertThat(followUpReservedByComplete)
+ .as("iteration %s must reserve exactly one follow-up
run", i)
+ .isNotEqualTo(followUpReservedByTrigger);
+ assertThat(controller.completeRun()).isFalse();
+ }
+ } finally {
+ executor.shutdownNow();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index b9c95f445cc..76c223b0405 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -1184,6 +1184,130 @@ public class OffloadPrefixTest extends
MockedBookKeeperTestCase {
}
}
+ @Test
+ public void automaticOffloadTriggersAreCoalescedWhileOffloadInProgress()
throws Exception {
+ CompletableFuture<Void> slowOffload = new CompletableFuture<>();
+ CountDownLatch offloadRunning = new CountDownLatch(1);
+ AtomicInteger offloadPolicyCalls = new AtomicInteger();
+ MockLedgerOffloader offloader = new MockLedgerOffloader() {
+ @Override
+ public CompletableFuture<Void> offload(ReadHandle ledger,
+ UUID uuid,
+ Map<String, String>
extraMetadata) {
+ offloadRunning.countDown();
+ return slowOffload.thenCompose((res) ->
super.offload(ledger, uuid, extraMetadata));
+ }
+
+ @Override
+ public OffloadPoliciesImpl getOffloadPolicies() {
+ offloadPolicyCalls.incrementAndGet();
+ return super.getOffloadPolicies();
+ }
+ };
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L);
+
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl) factory.open("my_test_ledger" +
UUID.randomUUID(), config);
+
+ for (int i = 0; i < 25; i++) {
+ ledger.addEntry(buildEntry(10, "entry-" + i));
+ }
+ assertTrue(offloadRunning.await(5, TimeUnit.SECONDS));
+
+ // Repeated automatic triggers should stop at the controller and avoid
another policy lookup.
+ int callsBeforeRepeatedTriggers = offloadPolicyCalls.get();
+ for (int i = 0; i < 20; i++) {
+
ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER);
+ }
+
+ assertEquals(offloadPolicyCalls.get(), callsBeforeRepeatedTriggers);
+
+ slowOffload.complete(null);
+
+ assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
+ List<Long> allLedgerIds =
ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+ assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0),
allLedgerIds.get(1)));
+ }
+
+ @Test
+ public void automaticOffloadRunsAgainForCoalescedTrigger() throws
Exception {
+ CompletableFuture<Void> slowOffload = new CompletableFuture<>();
+ CountDownLatch offloadRunning = new CountDownLatch(1);
+ MockLedgerOffloader offloader = new MockLedgerOffloader() {
+ @Override
+ public CompletableFuture<Void> offload(ReadHandle ledger,
+ UUID uuid,
+ Map<String, String>
extraMetadata) {
+ offloadRunning.countDown();
+ return slowOffload.thenCompose((res) ->
super.offload(ledger, uuid, extraMetadata));
+ }
+ };
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L);
+
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl) factory.open("my_test_ledger" +
UUID.randomUUID(), config);
+
+ for (int i = 0; i < 11; i++) {
+ ledger.addEntry(buildEntry(10, "entry-" + i));
+ }
+ assertTrue(offloadRunning.await(5, TimeUnit.SECONDS));
+
+ // The next ledger closes after the first automatic scan, so it
depends on the coalesced rerun.
+ for (int i = 11; i < 21; i++) {
+ ledger.addEntry(buildEntry(10, "entry-" + i));
+ }
+ assertEquals(offloader.offloadedLedgers().size(), 0);
+
+ slowOffload.complete(null);
+
+ assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
+ List<Long> allLedgerIds =
ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+ assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0),
allLedgerIds.get(1)));
+ }
+
+ @Test
+ public void automaticOffloadWithoutThresholdDoesNotBlockLaterTriggers()
throws Exception {
+ MockLedgerOffloader offloader = new MockLedgerOffloader();
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(-1L);
+
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null);
+ config.setLedgerOffloader(offloader);
+
+ ManagedLedgerImpl ledger =
+ (ManagedLedgerImpl) factory.open("my_test_ledger" +
UUID.randomUUID(), config);
+
+ for (int i = 0; i < 25; i++) {
+ ledger.addEntry(buildEntry(10, "entry-" + i));
+ }
+
ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER);
+ assertEquals(offloader.offloadedLedgers().size(), 0);
+
+ // A disabled automatic trigger must complete internally so a later
valid trigger can run.
+
offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L);
+
ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER);
+
+ assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
+ List<Long> allLedgerIds =
ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList();
+ assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0),
allLedgerIds.get(1)));
+ }
+
@DataProvider(name = "offloadAsSoonAsClosed")
public Object[][] offloadAsSoonAsClosedProvider() {
return new Object[][]{