This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 29778d5bd3c [improve][broker]Find the target position at most once, during expiring messages for a topic, even though there are many subscriptions (#24622) 29778d5bd3c is described below commit 29778d5bd3c97d065b5dbdb935cc8689e00d5c4c Author: Apurva007 <apurvatelan...@gmail.com> AuthorDate: Tue Aug 20 16:10:26 2024 +0800 [improve][broker]Find the target position at most once, during expiring messages for a topic, even though there are many subscriptions (#24622) (cherry picked from commit 84205ebd849479edac2c6533ea9259091e2e5bed) --- .../bookkeeper/mledger/impl/OpFindNewest.java | 3 + .../pulsar/broker/service/MessageExpirer.java | 10 +++ .../persistent/PersistentMessageExpiryMonitor.java | 15 ++++ .../broker/service/persistent/PersistentTopic.java | 76 +++++++++++++++++++-- .../impl}/PersistentMessageExpiryMonitorTest.java | 79 ++++++++++++++++++---- .../broker/service/PersistentTopicE2ETest.java | 4 +- .../pulsar/broker/stats/PrometheusMetricsTest.java | 7 +- 7 files changed, 171 insertions(+), 23 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java index 8f86cb33ae8..31c1a090b50 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java @@ -250,6 +250,9 @@ class OpFindNewest implements ReadEntryCallback { return nextPosition; } + /** + * Find the largest entry that matches the given predicate. + */ public void find() { if (cursor != null ? cursor.hasMoreEntries(searchPosition) : ledger.hasMoreEntries(searchPosition)) { ledger.asyncReadEntry(searchPosition, this, null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java index 3008717a3df..60cf5bd0523 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java @@ -25,9 +25,19 @@ import org.apache.pulsar.common.classification.InterfaceStability; @InterfaceStability.Evolving public interface MessageExpirer { + /** + * Mark delete the largest position that is less than or equals the {@param position}. + */ boolean expireMessages(Position position); + /** + * Mark delete the largest message that publish timestamp is less than the result of the expression + * "{@link System#currentTimeMillis - {@param messageTTLInSeconds})". + */ boolean expireMessages(int messageTTLInSeconds); + /** + * Async implementation of {@link #expireMessages(int)}. + */ CompletableFuture<Boolean> expireMessagesAsync(int messageTTLInSeconds); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 348284fc624..a51f02b0c0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -34,6 +34,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistExcept import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.pulsar.broker.service.MessageExpirer; import org.apache.pulsar.client.impl.MessageImpl; @@ -145,6 +146,20 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag public boolean expireMessages(Position messagePosition) { // If it's beyond last position of this topic, do nothing. Position topicLastPosition = this.topic.getLastPosition(); + ManagedLedger managedLedger = cursor.getManagedLedger(); + if (managedLedger instanceof ManagedLedgerImpl ml) { + // Confirm the position is valid. + Optional<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfoOptional = + ml.getOptionalLedgerInfo(messagePosition.getLedgerId()); + if (ledgerInfoOptional.isPresent()) { + if (messagePosition.getEntryId() >= 0 + && ledgerInfoOptional.get().getEntries() - 1 >= messagePosition.getEntryId()) { + findEntryComplete(messagePosition, null); + return true; + } + } + } + // Fallback to the slower solution if the managed ledger is not an instance of ManagedLedgerImpl. if (topicLastPosition.compareTo(messagePosition) < 0) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore expire-message scheduled task, given position {} is beyond " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index b31ae804247..061cbb1b13e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -88,6 +88,7 @@ import org.apache.bookkeeper.mledger.PositionBound; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer.CursorInfo; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -2115,19 +2116,82 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public void checkMessageExpiry() { int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get(); - if (messageTtlInSeconds != 0) { + if (messageTtlInSeconds <= 0) { + return; + } + + ManagedLedger managedLedger = getManagedLedger(); + if (managedLedger instanceof ManagedLedgerImpl ml) { + checkMessageExpiryWithSharedPosition(ml, messageTtlInSeconds); + } else { + // Fallback to the slower solution if managed ledger is not an instance of ManagedLedgerImpl: each + // subscription find position and handle expiring itself. + checkMessageExpiryWithoutSharedPosition(messageTtlInSeconds); + } + } + + private void checkMessageExpiryWithoutSharedPosition(int messageTtlInSeconds) { + subscriptions.forEach((__, sub) -> { + if (!isCompactionSubscription(sub.getName()) + && (additionalSystemCursorNames.isEmpty() + || !additionalSystemCursorNames.contains(sub.getName()))) { + sub.expireMessagesAsync(messageTtlInSeconds); + } + }); + replicators.forEach((__, replicator) + -> ((PersistentReplicator) replicator).expireMessagesAsync(messageTtlInSeconds)); + shadowReplicators.forEach((__, replicator) + -> ((PersistentReplicator) replicator).expireMessagesAsync(messageTtlInSeconds)); + } + + private void checkMessageExpiryWithSharedPosition(ManagedLedgerImpl ml, int messageTtlInSeconds) { + // Find the target position at one time, then expire all subscriptions and replicators. + ManagedCursor cursor = ml.getCursors().getCursorWithOldestPosition().getCursor(); + PersistentMessageFinder finder = new PersistentMessageFinder(topic, cursor, brokerService.getPulsar() + .getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis()); + // Find the target position. + long expiredMessageTimestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(messageTtlInSeconds); + CompletableFuture<Position> positionToMarkDelete = new CompletableFuture<>(); + finder.findMessages(expiredMessageTimestamp, new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + positionToMarkDelete.complete(position); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, + Object ctx) { + log.error("[{}] Error finding expired position, failed reading position is {}", topic, + failedReadPosition.orElse(null), exception); + // Since we have logged the error, we can skip to print error log at next step. + positionToMarkDelete.complete(null); + } + }); + positionToMarkDelete.thenAccept(position -> { + if (position == null) { + // Nothing need to be expired. + return; + } + // Expire messages by position, which is more efficient. subscriptions.forEach((__, sub) -> { if (!isCompactionSubscription(sub.getName()) && (additionalSystemCursorNames.isEmpty() - || !additionalSystemCursorNames.contains(sub.getName()))) { - sub.expireMessagesAsync(messageTtlInSeconds); + || !additionalSystemCursorNames.contains(sub.getName()))) { + // The variable "position" is to mark delete position. + // Regarding the method "expireMessages(position)", it will mark delete the target position if the + // position is valid, otherwise, it mark deletes the previous valid position. + // So we give it the position to be mark deleted. + sub.expireMessages(position); } }); replicators.forEach((__, replicator) - -> ((PersistentReplicator) replicator).expireMessagesAsync(messageTtlInSeconds)); + -> ((PersistentReplicator) replicator).expireMessages(position)); shadowReplicators.forEach((__, replicator) - -> ((PersistentReplicator) replicator).expireMessagesAsync(messageTtlInSeconds)); - } + -> ((PersistentReplicator) replicator).expireMessages(position)); + }).exceptionally(ex -> { + log.error("[{}] Failed to expire messages by position", topic, ex); + return null; + }); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java similarity index 62% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java rename to pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java index 5535561a5fa..39aec66726e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java @@ -16,28 +16,31 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.service.persistent; +package org.apache.bookkeeper.mledger.impl; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; import static org.testng.AssertJUnit.assertEquals; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; -import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.awaitility.Awaitility; -import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -59,6 +62,11 @@ public class PersistentMessageExpiryMonitorTest extends ProducerConsumerBase { super.internalCleanup(); } + @Override + protected void doInitConf() throws Exception { + conf.setMessageExpiryCheckIntervalInMinutes(60); + } + /*** * Confirm the anti-concurrency mechanism "expirationCheckInProgressUpdater" works. */ @@ -76,7 +84,7 @@ public class PersistentMessageExpiryMonitorTest extends ProducerConsumerBase { (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(cursorName); - ManagedCursorImpl spyCursor = Mockito.spy(cursor); + ManagedCursorImpl spyCursor = spy(cursor); // Make the mark-deleting delay. CountDownLatch firstFindingCompleted = new CountDownLatch(1); @@ -98,14 +106,6 @@ public class PersistentMessageExpiryMonitorTest extends ProducerConsumerBase { calledFindPositionCount.incrementAndGet(); return invocationOnMock.callRealMethod(); }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), any(), any(), anyBoolean()); - doAnswer(invocationOnMock -> { - calledFindPositionCount.incrementAndGet(); - return invocationOnMock.callRealMethod(); - }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), anyBoolean()); - doAnswer(invocationOnMock -> { - calledFindPositionCount.incrementAndGet(); - return invocationOnMock.callRealMethod(); - }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any()); // Sleep 2s to make "find(1s)" get a position. Thread.sleep(2000); @@ -138,4 +138,57 @@ public class PersistentMessageExpiryMonitorTest extends ProducerConsumerBase { producer.close(); admin.topics().delete(topicName); } + + /*** + * Verify finding position task only executes once for multiple subscriptions of a topic. + */ + @Test(invocationCount = 2) + void testTopicExpireMessages() throws Exception { + // Create topic. + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(topicName); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + final String cursorName1 = "s1"; + final String cursorName2 = "s2"; + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + admin.topics().createSubscriptionAsync(topicName, cursorName1, MessageId.earliest); + admin.topics().createSubscriptionAsync(topicName, cursorName2, MessageId.earliest); + admin.topicPolicies().setMessageTTL(topicName, 1); + Awaitility.await().untilAsserted(() -> { + assertEquals(1, persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get().intValue()); + }); + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + ml.getConfig().setMaxEntriesPerLedger(2); + ml.getConfig().setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + long firstLedger = ml.currentLedger.getId(); + System.out.println("maxEntriesPerLedger 1 : " + ml.getConfig().getMaxEntriesPerLedger()); + // Trigger 3 ledgers creation. + for (int i = 0; i < 5; i++) { + producer.send("" + i); + Thread.sleep(100); + } + System.out.println("maxEntriesPerLedger 2 : " + ml.getConfig().getMaxEntriesPerLedger()); + assertEquals(3, ml.getLedgersInfo().size()); + // Do a injection to count the access of the first ledger. + AtomicInteger accessedCount = new AtomicInteger(); + ReadHandle readHandle = ml.getLedgerHandle(firstLedger).get(); + ReadHandle spyReadHandle = spy(readHandle); + doAnswer(invocationOnMock -> { + long startEntry = (long) invocationOnMock.getArguments()[0]; + if (startEntry == 0) { + accessedCount.incrementAndGet(); + } + return invocationOnMock.callRealMethod(); + }).when(spyReadHandle).readAsync(anyLong(), anyLong()); + ml.ledgerCache.put(firstLedger, CompletableFuture.completedFuture(spyReadHandle)); + // Verify: the first ledger will be accessed only once after expiry for two subscriptions. + persistentTopic.checkMessageExpiry(); + Thread.sleep(2000); + assertEquals(1, accessedCount.get()); + + // cleanup. + producer.close(); + admin.topics().delete(topicName); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index c6fc9bd0eef..cf08dd48261 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1119,12 +1119,12 @@ public class PersistentTopicE2ETest extends BrokerTestBase { rolloverPerIntervalStats(); assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs); - Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs)); + Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs - 1)); runMessageExpiryCheck(); assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs); - Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs / 2)); + Thread.sleep(TimeUnit.SECONDS.toMillis(1 + messageTTLSecs / 2)); runMessageExpiryCheck(); assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 2d181d852a8..8bb05373aad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -801,11 +801,14 @@ public class PrometheusMetricsTest extends BrokerTestBase { p2.close(); // Let the message expire for (String topic : topicList) { + // The TTL value can not be set to a negative value, the mininum value is 1. PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() .getTopicIfExists(topic).get().get(); - persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(-1); - persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().updateBrokerValue(-1); + persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(1); + persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().updateBrokerValue(1); } + // Wait 2 seconds to expire message. + Thread.sleep(2000); pulsar.getBrokerService().forEachTopic(Topic::checkMessageExpiry); //wait for checkMessageExpiry PersistentSubscription sub = (PersistentSubscription)