This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 6295ee8a6ff [PIP-153][optimize][txn] Optimize metadataPositions in
MLPendingAckStore (#15137)
6295ee8a6ff is described below
commit 6295ee8a6ff666df8af42c6e34d43092baafd31c
Author: mattison chao <[email protected]>
AuthorDate: Wed May 25 15:43:06 2022 +0800
[PIP-153][optimize][txn] Optimize metadataPositions in MLPendingAckStore
(#15137)
Master Issue: https://github.com/apache/pulsar/issues/15073
Reduce the memory occupied by metadataPositions and avoid OOM
Regularly store a small amount of data according to certain rules, the
detailed implementation can be found in
[PIP153](https://github.com/apache/pulsar/issues/15073)
(cherry picked from commit ebca19b522fd9f4496689ca7d32ede345d28511a)
---
conf/broker.conf | 6 +
.../apache/pulsar/broker/ServiceConfiguration.java | 11 +-
.../pendingack/impl/MLPendingAckStore.java | 170 +++++++++++----------
.../pendingack/impl/MLPendingAckStoreProvider.java | 11 +-
.../transaction/util/LogIndexLagBackoff.java | 49 ++++++
.../broker/transaction/util/package-info.java | 22 +++
.../pulsar/broker/transaction/TransactionTest.java | 4 +-
.../pendingack/PendingAckMetadataTest.java | 2 +-
.../pendingack/PendingAckPersistentTest.java | 105 +++++++++++++
.../pulsar/utils/LogIndexLagBackOffTest.java | 55 +++++++
10 files changed, 345 insertions(+), 90 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index b67afa89c0b..3d11ce2538a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1328,6 +1328,12 @@ transactionBufferSnapshotMinTimeInMillis=5000
# The max concurrent requests for transaction buffer client, default is 1000
transactionBufferClientMaxConcurrentRequests=1000
+# MLPendingAckStore maintains a ConcurrentSkipListMap pendingAckLogIndex,
+# It stores the position in pendingAckStore as its value and saves a position
used to determine
+# whether the previous data can be cleaned up as a key.
+# transactionPendingAckLogIndexMinLag is used to configure the minimum lag
between indexes
+transactionPendingAckLogIndexMinLag=500
+
### --- Packages management service configuration variables (begin) --- ###
# Enable the packages management service or not
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 53709718545..dc5640f2315 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2257,7 +2257,16 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private long transactionBufferClientOperationTimeoutInMills = 3000L;
- /**** --- KeyStore TLS config variables --- ****/
+ @FieldContext(
+ category = CATEGORY_TRANSACTION,
+ doc = "MLPendingAckStore maintain a ConcurrentSkipListMap
pendingAckLogIndex`,"
+ + "it store the position in pendingAckStore as value and
save a position used to determine"
+ + "whether the previous data can be cleaned up as a key."
+ + "transactionPendingAckLogIndexMinLag is used to
configure the minimum lag between indexes"
+ )
+ private long transactionPendingAckLogIndexMinLag = 500L;
+
+ /**** --- KeyStore TLS config variables. --- ****/
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "Enable TLS with KeyStore type configuration in broker"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index bb8b961e36b..5fc210ca741 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -42,6 +42,7 @@ import
org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import
org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadata;
import
org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadataEntry;
import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckOp;
+import org.apache.pulsar.broker.transaction.util.LogIndexLagBackoff;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -72,33 +73,40 @@ public class MLPendingAckStore implements PendingAckStore {
private PositionImpl currentLoadPosition;
+ private final AtomicLong currentIndexLag = new AtomicLong(0);
+ private volatile long maxIndexLag;
+
+ protected PositionImpl maxAckPosition = PositionImpl.EARLIEST;
+ private final LogIndexLagBackoff logIndexBackoff;
+
/**
* The map is for pending ack store clear useless data.
* <p>
- * When ack message append to pending ack store, it will store the
position which is persistent as key.
+ * key:the largest ack position of origin topic, corresponds to the
value position.
* <p>
- * When ack message append to pending ack store, it will store the
position which is the max position of this
- * ack by the original topic as value.
+ * value:the position persistent by pendingAck log.
* <p>
- * It will judge the position with the max sub cursor position whether
smaller than the subCursor mark
+ * It will judge the position with the max sub cursor position (key)
whether smaller than the subCursor mark
* delete position.
* <p>
- * If the max position is smaller than the subCursor mark delete
position, the log cursor will mark delete
- * the position.
+ * If the max position (key) is smaller than the subCursor mark
delete position,
+ * the log cursor will mark delete the position before log
position (value).
*/
- private final ConcurrentSkipListMap<PositionImpl, PositionImpl>
metadataPositions;
+ private final ConcurrentSkipListMap<PositionImpl, PositionImpl>
pendingAckLogIndex;
private final ManagedCursor subManagedCursor;
public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor,
- ManagedCursor subManagedCursor) {
+ ManagedCursor subManagedCursor, long
transactionPendingAckLogIndexMinLag) {
this.managedLedger = managedLedger;
this.cursor = cursor;
this.currentLoadPosition = (PositionImpl)
this.cursor.getMarkDeletedPosition();
this.entryQueue = new SpscArrayQueue<>(2000);
this.lastConfirmedEntry = (PositionImpl)
managedLedger.getLastConfirmedEntry();
- this.metadataPositions = new ConcurrentSkipListMap<>();
+ this.pendingAckLogIndex = new ConcurrentSkipListMap<>();
this.subManagedCursor = subManagedCursor;
+ this.logIndexBackoff = new
LogIndexLagBackoff(transactionPendingAckLogIndexMinLag, Long.MAX_VALUE, 1);
+ this.maxIndexLag = logIndexBackoff.next(0);
}
@Override
@@ -219,62 +227,12 @@ public class MLPendingAckStore implements PendingAckStore
{
log.debug("[{}][{}] MLPendingAckStore message append
success at {} txnId: {}, operation : {}",
managedLedger.getName(), ctx, position, txnID,
pendingAckMetadataEntry.getPendingAckOp());
}
- // store the persistent position in to memory
- if (pendingAckMetadataEntry.getPendingAckOp() !=
PendingAckOp.ABORT
- && pendingAckMetadataEntry.getPendingAckOp() !=
PendingAckOp.COMMIT) {
- Optional<PendingAckMetadata> optional =
pendingAckMetadataEntry.getPendingAckMetadatasList()
- .stream().max((o1, o2) ->
ComparisonChain.start().compare(o1.getLedgerId(),
- o2.getLedgerId()).compare(o1.getEntryId(),
o2.getEntryId()).result());
- optional.ifPresent(pendingAckMetadata ->
- metadataPositions.compute((PositionImpl) position,
(thisPosition, otherPosition) -> {
- PositionImpl nowPosition =
PositionImpl.get(pendingAckMetadata.getLedgerId(),
- pendingAckMetadata.getEntryId());
- if (otherPosition == null) {
- return nowPosition;
- } else {
- return
nowPosition.compareTo(otherPosition) > 0 ? nowPosition : otherPosition;
- }
- }));
- }
-
+ currentIndexLag.incrementAndGet();
+ handleMetadataEntry((PositionImpl) position,
pendingAckMetadataEntry);
buf.release();
completableFuture.complete(null);
- if (!metadataPositions.isEmpty()) {
- PositionImpl deletePosition = null;
- while (!metadataPositions.isEmpty()
- && metadataPositions.firstKey() != null
- &&
subManagedCursor.getPersistentMarkDeletedPosition() != null
- && metadataPositions.firstEntry().getValue()
- .compareTo((PositionImpl)
subManagedCursor.getPersistentMarkDeletedPosition()) <= 0) {
- deletePosition = metadataPositions.firstKey();
- metadataPositions.remove(metadataPositions.firstKey());
- }
-
- if (deletePosition != null) {
- PositionImpl finalDeletePosition = deletePosition;
- cursor.asyncMarkDelete(deletePosition,
- new AsyncCallbacks.MarkDeleteCallback() {
- @Override
- public void markDeleteComplete(Object ctx)
{
- if (log.isDebugEnabled()) {
- log.debug("[{}] Transaction
pending ack store mark delete position : "
- + "[{}] success",
managedLedger.getName(),
- finalDeletePosition);
- }
- }
-
- @Override
- public void
markDeleteFailed(ManagedLedgerException exception, Object ctx) {
- if (log.isDebugEnabled()) {
- log.error("[{}] Transaction
pending ack store mark delete position : "
- + "[{}] fail!",
managedLedger.getName(),
- finalDeletePosition,
exception);
- }
- }
- }, null);
- }
- }
+ clearUselessLogData();
}
@Override
@@ -292,6 +250,68 @@ public class MLPendingAckStore implements PendingAckStore {
return completableFuture;
}
+ private void handleMetadataEntry(PositionImpl logPosition,
PendingAckMetadataEntry pendingAckMetadataEntry) {
+ // store the persistent position in to memory
+ // store the max position of this entry retain
+ if (pendingAckMetadataEntry.getPendingAckOp() != PendingAckOp.ABORT
+ && pendingAckMetadataEntry.getPendingAckOp() !=
PendingAckOp.COMMIT) {
+ Optional<PendingAckMetadata> optional =
pendingAckMetadataEntry.getPendingAckMetadatasList()
+ .stream().max((o1, o2) ->
ComparisonChain.start().compare(o1.getLedgerId(),
+ o2.getLedgerId()).compare(o1.getEntryId(),
o2.getEntryId()).result());
+
+ optional.ifPresent(pendingAckMetadata -> {
+ PositionImpl nowPosition =
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+ pendingAckMetadata.getEntryId());
+
+ if (nowPosition.compareTo(maxAckPosition) > 0) {
+ maxAckPosition = nowPosition;
+ }
+ if (currentIndexLag.get() >= maxIndexLag) {
+ pendingAckLogIndex.compute(maxAckPosition,
+ (thisPosition, otherPosition) -> logPosition);
+ maxIndexLag =
logIndexBackoff.next(pendingAckLogIndex.size());
+ currentIndexLag.set(0);
+ }
+ });
+ }
+ }
+
+ private void clearUselessLogData() {
+ if (!pendingAckLogIndex.isEmpty()) {
+ PositionImpl deletePosition = null;
+ while (!pendingAckLogIndex.isEmpty()
+ && pendingAckLogIndex.firstKey() != null
+ && subManagedCursor.getPersistentMarkDeletedPosition() !=
null
+ && pendingAckLogIndex.firstEntry().getKey()
+ .compareTo((PositionImpl)
subManagedCursor.getPersistentMarkDeletedPosition()) <= 0) {
+ deletePosition =
pendingAckLogIndex.remove(pendingAckLogIndex.firstKey());
+ }
+
+ if (deletePosition != null) {
+ maxIndexLag = logIndexBackoff.next(pendingAckLogIndex.size());
+ PositionImpl finalDeletePosition = deletePosition;
+ cursor.asyncMarkDelete(deletePosition,
+ new AsyncCallbacks.MarkDeleteCallback() {
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Transaction pending ack
store mark delete position : "
+ + "[{}] success",
managedLedger.getName(),
+ finalDeletePosition);
+ }
+ }
+
+ @Override
+ public void
markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+ log.error("[{}] Transaction pending ack store
mark delete position : "
+ + "[{}] fail!",
managedLedger.getName(),
+ finalDeletePosition, exception);
+ }
+ }, null);
+ }
+ }
+ }
+
class PendingAckReplay implements Runnable {
private final FillEntryQueueCallback fillEntryQueueCallback;
@@ -319,30 +339,12 @@ public class MLPendingAckStore implements PendingAckStore
{
currentLoadPosition =
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
PendingAckMetadataEntry pendingAckMetadataEntry = new
PendingAckMetadataEntry();
pendingAckMetadataEntry.parseFrom(buffer,
buffer.readableBytes());
- // store the persistent position in to memory
- // store the max position of this entry retain
- if (pendingAckMetadataEntry.getPendingAckOp() !=
PendingAckOp.ABORT
- && pendingAckMetadataEntry.getPendingAckOp()
!= PendingAckOp.COMMIT) {
- Optional<PendingAckMetadata> optional =
pendingAckMetadataEntry.getPendingAckMetadatasList()
- .stream().max((o1, o2) ->
ComparisonChain.start().compare(o1.getLedgerId(),
-
o2.getLedgerId()).compare(o1.getEntryId(), o2.getEntryId()).result());
-
- optional.ifPresent(pendingAckMetadata ->
-
metadataPositions.compute(PositionImpl.get(entry.getLedgerId(),
entry.getEntryId()),
- (thisPosition, otherPosition) -> {
- PositionImpl nowPosition =
PositionImpl
-
.get(pendingAckMetadata.getLedgerId(),
-
pendingAckMetadata.getEntryId());
- if (otherPosition == null) {
- return nowPosition;
- } else {
- return
nowPosition.compareTo(otherPosition) > 0 ? nowPosition
- : otherPosition;
- }
- }));
- }
+ currentIndexLag.incrementAndGet();
+ handleMetadataEntry(new
PositionImpl(entry.getLedgerId(), entry.getEntryId()),
+ pendingAckMetadataEntry);
pendingAckReplyCallBack.handleMetadataEntry(pendingAckMetadataEntry);
entry.release();
+ clearUselessLogData();
} else {
try {
Thread.sleep(1);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index 5417caec3d7..6b84d6e329a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -74,9 +74,14 @@ public class MLPendingAckStoreProvider implements
TransactionPendingAckStoreProv
InitialPosition.Earliest, new
AsyncCallbacks.OpenCursorCallback() {
@Override
public void
openCursorComplete(ManagedCursor cursor, Object ctx) {
- pendingAckStoreFuture
- .complete(new
MLPendingAckStore(ledger, cursor,
-
subscription.getCursor()));
+
pendingAckStoreFuture.complete(new MLPendingAckStore(ledger,
+ cursor,
+
subscription.getCursor(),
+
originPersistentTopic
+
.getBrokerService()
+
.getPulsar()
+
.getConfiguration()
+
.getTransactionPendingAckLogIndexMinLag()));
if
(log.isDebugEnabled()) {
log.debug("{},{}
open MLPendingAckStore cursor success",
originPersistentTopic.getName(),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/LogIndexLagBackoff.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/LogIndexLagBackoff.java
new file mode 100644
index 00000000000..145381814ba
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/LogIndexLagBackoff.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import lombok.Getter;
+
+public class LogIndexLagBackoff {
+
+ @Getter
+ private final long minLag;
+ @Getter
+ private final long maxLag;
+ @Getter
+ private final double exponent;
+
+ public LogIndexLagBackoff(long minLag, long maxLag, double exponent) {
+ checkArgument(minLag > 0, "min lag must be > 0");
+ checkArgument(maxLag >= minLag, "maxLag should be >= minLag");
+ checkArgument(exponent > 0, "exponent must be > 0");
+ this.minLag = minLag;
+ this.maxLag = maxLag;
+ this.exponent = exponent;
+ }
+
+
+ public long next(int indexCount) {
+ if (indexCount <= 0) {
+ return minLag;
+ }
+ return (long) Math.min(this.maxLag, minLag * Math.pow(indexCount,
exponent));
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/package-info.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/package-info.java
new file mode 100644
index 00000000000..58cb1c24b19
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of a transaction tools.
+ */
+package org.apache.pulsar.broker.transaction.util;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index f264289d696..0b59eda4523 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -620,7 +620,7 @@ public class TransactionTest extends TransactionTestBase {
TransactionPendingAckStoreProvider pendingAckStoreProvider =
mock(TransactionPendingAckStoreProvider.class);
doReturn(CompletableFuture.completedFuture(
- new MLPendingAckStore(persistentTopic.getManagedLedger(),
managedCursor, null)))
+ new MLPendingAckStore(persistentTopic.getManagedLedger(),
managedCursor, null, 500)))
.when(pendingAckStoreProvider).newPendingAckStore(any());
doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());
@@ -901,6 +901,8 @@ public class TransactionTest extends TransactionTestBase {
@Test
public void testPendingAckMarkDeletePosition() throws Exception {
+
getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(1);
+
getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(5);
String topic = NAMESPACE1 + "/test1";
@Cleanup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
index c99eee62463..14dbcdb8897 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
@@ -72,7 +72,7 @@ public class PendingAckMetadataTest extends
MockedBookKeeperTestCase {
ManagedCursor cursor = completableFuture.get().openCursor("test");
ManagedCursor subCursor = completableFuture.get().openCursor("test");
MLPendingAckStore pendingAckStore =
- new MLPendingAckStore(completableFuture.get(), cursor,
subCursor);
+ new MLPendingAckStore(completableFuture.get(), cursor,
subCursor, 500);
Field field =
MLPendingAckStore.class.getDeclaredField("managedLedger");
field.setAccessible(true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 196d9bbebaa..a64707f1ae8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -48,6 +49,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -57,6 +59,7 @@ import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -200,6 +203,8 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
@Test
public void cumulativePendingAckReplayTest() throws Exception {
int messageCount = 1000;
+
getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(4
* messageCount + 2);
+
getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(10);
String subName = "cumulative-test";
@Cleanup
@@ -354,6 +359,106 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
assertFalse(topics.contains(topic));
}
+ @Test
+ public void testDeleteUselessLogDataWhenSubCursorMoved() throws Exception {
+
getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(5);
+
getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(5);
+ String subName = "test-log-delete";
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
+ NamespaceName.get(NAMESPACE1), "test-log-delete").toString();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .enableBatching(false)
+ .create();
+
+ for (int i = 0; i < 20; i++) {
+ producer.newMessage().send();
+ }
+ // init
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+
+ PersistentTopic persistentTopic = (PersistentTopic)
getPulsarServiceList().get(0)
+ .getBrokerService().getTopic(topic, false).get().get();
+
+ PersistentSubscription persistentSubscription =
persistentTopic.getSubscription(subName);
+ Field field =
PersistentSubscription.class.getDeclaredField("pendingAckHandle");
+ field.setAccessible(true);
+ PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl)
field.get(persistentSubscription);
+ Field field1 =
PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
+ field1.setAccessible(true);
+ PendingAckStore pendingAckStore =
((CompletableFuture<PendingAckStore>) field1.get(pendingAckHandle)).get();
+
+ Field field3 =
MLPendingAckStore.class.getDeclaredField("pendingAckLogIndex");
+ Field field4 = MLPendingAckStore.class.getDeclaredField("maxIndexLag");
+
+ field3.setAccessible(true);
+ field4.setAccessible(true);
+
+ ConcurrentSkipListMap<PositionImpl, PositionImpl> pendingAckLogIndex =
+ (ConcurrentSkipListMap<PositionImpl, PositionImpl>)
field3.get(pendingAckStore);
+ long maxIndexLag = (long) field4.get(pendingAckStore);
+ Assert.assertEquals(pendingAckLogIndex.size(), 0);
+ Assert.assertEquals(maxIndexLag, 5);
+ transaction.commit().get();
+
+ Awaitility.await().untilAsserted(() ->
+
Assert.assertEquals(persistentSubscription.getCursor().getPersistentMarkDeletedPosition().getEntryId(),
+ ((MessageIdImpl)message.getMessageId()).getEntryId()));
+ // 7 more acks. Will find that there are still only two records in the
map.
+ Transaction transaction1 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ Message<byte[]> message0 = null;
+ //remove previous index
+ for (int i = 0; i < 4; i++) {
+ message0 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message0.getMessageId(),
transaction1).get();
+ }
+ Assert.assertEquals(pendingAckLogIndex.size(), 1);
+ maxIndexLag = (long) field4.get(pendingAckStore);
+ Assert.assertEquals(maxIndexLag, 5);
+ //add new index
+ for (int i = 0; i < 9; i++) {
+ message0= consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message0.getMessageId(),
transaction1).get();
+ }
+
+ Assert.assertEquals(pendingAckLogIndex.size(), 2);
+ maxIndexLag = (long) field4.get(pendingAckStore);
+ Assert.assertEquals(maxIndexLag, 10);
+
+ transaction1.commit().get();
+ Message<byte[]> message1 = message0;
+ Awaitility.await().untilAsserted(() ->
+
Assert.assertEquals(persistentSubscription.getCursor().getPersistentMarkDeletedPosition().getEntryId(),
+
((MessageIdImpl)message1.getMessageId()).getEntryId()));
+
+ Transaction transaction2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message2.getMessageId(), transaction2).get();
+
+ Assert.assertEquals(pendingAckLogIndex.size(), 0);
+ maxIndexLag = (long) field4.get(pendingAckStore);
+ Assert.assertEquals(maxIndexLag, 5);
+ }
+
@Test
public void testPendingAckLowWaterMarkRemoveFirstTxn() throws Exception {
String topic = TopicName.get(TopicDomain.persistent.toString(),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/LogIndexLagBackOffTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/LogIndexLagBackOffTest.java
new file mode 100644
index 00000000000..8d4f2c356a6
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/LogIndexLagBackOffTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import org.apache.pulsar.broker.transaction.util.LogIndexLagBackoff;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "utils")
+public class LogIndexLagBackOffTest {
+ @Test
+ public void testGenerateNextLogIndexLag() {
+ LogIndexLagBackoff logIndexLagBackoff = new LogIndexLagBackoff(1, 10,
1);
+ Assert.assertEquals(logIndexLagBackoff.next(0), 1);
+ Assert.assertEquals(logIndexLagBackoff.next(6), 6);
+
+ Assert.assertEquals(logIndexLagBackoff.next(77), 10);
+
+ logIndexLagBackoff = new LogIndexLagBackoff(1, 10, 2);
+ Assert.assertEquals(logIndexLagBackoff.next(3), 9);
+
+ try {
+ new LogIndexLagBackoff(-1, 2, 3);
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals(e.getMessage(), "min lag must be > 0");
+ }
+ try {
+ new LogIndexLagBackoff(2, 1, 3);
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals(e.getMessage(), "maxLag should be >= minLag");
+ }
+ try {
+ new LogIndexLagBackoff(1, 1, 0.2);
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals(e.getMessage(), "exponent must be > 0");
+ }
+
+ }
+}