This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 6295ee8a6ff [PIP-153][optimize][txn]  Optimize metadataPositions in 
MLPendingAckStore (#15137)
6295ee8a6ff is described below

commit 6295ee8a6ff666df8af42c6e34d43092baafd31c
Author: mattison chao <[email protected]>
AuthorDate: Wed May 25 15:43:06 2022 +0800

    [PIP-153][optimize][txn]  Optimize metadataPositions in MLPendingAckStore 
(#15137)
    
    Master Issue: https://github.com/apache/pulsar/issues/15073
    Reduce the memory occupied by metadataPositions and avoid OOM
    Regularly store a small amount of data according to certain rules, the 
detailed implementation can be found in 
[PIP153](https://github.com/apache/pulsar/issues/15073)
    
    (cherry picked from commit ebca19b522fd9f4496689ca7d32ede345d28511a)
---
 conf/broker.conf                                   |   6 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  11 +-
 .../pendingack/impl/MLPendingAckStore.java         | 170 +++++++++++----------
 .../pendingack/impl/MLPendingAckStoreProvider.java |  11 +-
 .../transaction/util/LogIndexLagBackoff.java       |  49 ++++++
 .../broker/transaction/util/package-info.java      |  22 +++
 .../pulsar/broker/transaction/TransactionTest.java |   4 +-
 .../pendingack/PendingAckMetadataTest.java         |   2 +-
 .../pendingack/PendingAckPersistentTest.java       | 105 +++++++++++++
 .../pulsar/utils/LogIndexLagBackOffTest.java       |  55 +++++++
 10 files changed, 345 insertions(+), 90 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index b67afa89c0b..3d11ce2538a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1328,6 +1328,12 @@ transactionBufferSnapshotMinTimeInMillis=5000
 # The max concurrent requests for transaction buffer client, default is 1000
 transactionBufferClientMaxConcurrentRequests=1000
 
+# MLPendingAckStore maintains a ConcurrentSkipListMap pendingAckLogIndex,
+# It stores the position in pendingAckStore as its value and saves a position 
used to determine
+# whether the previous data can be cleaned up as a key.
+# transactionPendingAckLogIndexMinLag is used to configure the minimum lag 
between indexes
+transactionPendingAckLogIndexMinLag=500
+
 ### --- Packages management service configuration variables (begin) --- ###
 
 # Enable the packages management service or not
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 53709718545..dc5640f2315 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2257,7 +2257,16 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private long transactionBufferClientOperationTimeoutInMills = 3000L;
 
-    /**** --- KeyStore TLS config variables --- ****/
+    @FieldContext(
+            category = CATEGORY_TRANSACTION,
+            doc = "MLPendingAckStore maintain a ConcurrentSkipListMap 
pendingAckLogIndex`,"
+                    + "it store the position in pendingAckStore as value and 
save a position used to determine"
+                    + "whether the previous data can be cleaned up as a key."
+                    + "transactionPendingAckLogIndexMinLag is used to 
configure the minimum lag between indexes"
+    )
+    private long transactionPendingAckLogIndexMinLag = 500L;
+
+    /**** --- KeyStore TLS config variables. --- ****/
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
             doc = "Enable TLS with KeyStore type configuration in broker"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index bb8b961e36b..5fc210ca741 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -42,6 +42,7 @@ import 
org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadata;
 import 
org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadataEntry;
 import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckOp;
+import org.apache.pulsar.broker.transaction.util.LogIndexLagBackoff;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -72,33 +73,40 @@ public class MLPendingAckStore implements PendingAckStore {
 
     private PositionImpl currentLoadPosition;
 
+    private final AtomicLong currentIndexLag = new AtomicLong(0);
+    private volatile long maxIndexLag;
+
+    protected PositionImpl maxAckPosition = PositionImpl.EARLIEST;
+    private final LogIndexLagBackoff logIndexBackoff;
+
     /**
      * The map is for pending ack store clear useless data.
      * <p>
-     *     When ack message append to pending ack store, it will store the 
position which is persistent as key.
+     *     key:the largest ack position of origin topic, corresponds to the 
value position.
      * <p>
-     *     When ack message append to pending ack store, it will store the 
position which is the max position of this
-     *     ack by the original topic as value.
+     *     value:the position persistent by pendingAck log.
      * <p>
-     *     It will judge the position with the max sub cursor position whether 
smaller than the subCursor mark
+     *     It will judge the position with the max sub cursor position (key) 
whether smaller than the subCursor mark
      *     delete position.
      *     <p>
-     *         If the max position is smaller than the subCursor mark delete 
position, the log cursor will mark delete
-     *         the position.
+     *         If the max position (key) is smaller than the subCursor mark 
delete position,
+     *         the log cursor will mark delete the position before log 
position (value).
      */
-    private final ConcurrentSkipListMap<PositionImpl, PositionImpl> 
metadataPositions;
+    private final ConcurrentSkipListMap<PositionImpl, PositionImpl> 
pendingAckLogIndex;
 
     private final ManagedCursor subManagedCursor;
 
     public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor,
-                             ManagedCursor subManagedCursor) {
+                             ManagedCursor subManagedCursor, long 
transactionPendingAckLogIndexMinLag) {
         this.managedLedger = managedLedger;
         this.cursor = cursor;
         this.currentLoadPosition = (PositionImpl) 
this.cursor.getMarkDeletedPosition();
         this.entryQueue = new SpscArrayQueue<>(2000);
         this.lastConfirmedEntry = (PositionImpl) 
managedLedger.getLastConfirmedEntry();
-        this.metadataPositions = new ConcurrentSkipListMap<>();
+        this.pendingAckLogIndex = new ConcurrentSkipListMap<>();
         this.subManagedCursor = subManagedCursor;
+        this.logIndexBackoff = new 
LogIndexLagBackoff(transactionPendingAckLogIndexMinLag, Long.MAX_VALUE, 1);
+        this.maxIndexLag = logIndexBackoff.next(0);
     }
 
     @Override
@@ -219,62 +227,12 @@ public class MLPendingAckStore implements PendingAckStore 
{
                     log.debug("[{}][{}] MLPendingAckStore message append 
success at {} txnId: {}, operation : {}",
                             managedLedger.getName(), ctx, position, txnID, 
pendingAckMetadataEntry.getPendingAckOp());
                 }
-                // store the persistent position in to memory
-                if (pendingAckMetadataEntry.getPendingAckOp() != 
PendingAckOp.ABORT
-                        && pendingAckMetadataEntry.getPendingAckOp() != 
PendingAckOp.COMMIT) {
-                    Optional<PendingAckMetadata> optional = 
pendingAckMetadataEntry.getPendingAckMetadatasList()
-                            .stream().max((o1, o2) -> 
ComparisonChain.start().compare(o1.getLedgerId(),
-                                    o2.getLedgerId()).compare(o1.getEntryId(), 
o2.getEntryId()).result());
-                    optional.ifPresent(pendingAckMetadata ->
-                            metadataPositions.compute((PositionImpl) position, 
(thisPosition, otherPosition) -> {
-                                PositionImpl nowPosition = 
PositionImpl.get(pendingAckMetadata.getLedgerId(),
-                                        pendingAckMetadata.getEntryId());
-                                if (otherPosition == null) {
-                                    return nowPosition;
-                                } else {
-                                    return 
nowPosition.compareTo(otherPosition) > 0 ? nowPosition : otherPosition;
-                                }
-                    }));
-                }
-
+                currentIndexLag.incrementAndGet();
+                handleMetadataEntry((PositionImpl) position, 
pendingAckMetadataEntry);
                 buf.release();
                 completableFuture.complete(null);
 
-                if (!metadataPositions.isEmpty()) {
-                    PositionImpl deletePosition = null;
-                    while (!metadataPositions.isEmpty()
-                            && metadataPositions.firstKey() != null
-                            && 
subManagedCursor.getPersistentMarkDeletedPosition() != null
-                            && metadataPositions.firstEntry().getValue()
-                            .compareTo((PositionImpl) 
subManagedCursor.getPersistentMarkDeletedPosition()) <= 0) {
-                        deletePosition = metadataPositions.firstKey();
-                        metadataPositions.remove(metadataPositions.firstKey());
-                    }
-
-                    if (deletePosition != null) {
-                        PositionImpl finalDeletePosition = deletePosition;
-                        cursor.asyncMarkDelete(deletePosition,
-                                new AsyncCallbacks.MarkDeleteCallback() {
-                                    @Override
-                                    public void markDeleteComplete(Object ctx) 
{
-                                        if (log.isDebugEnabled()) {
-                                            log.debug("[{}] Transaction 
pending ack store mark delete position : "
-                                                            + "[{}] success", 
managedLedger.getName(),
-                                                    finalDeletePosition);
-                                        }
-                                    }
-
-                                    @Override
-                                    public void 
markDeleteFailed(ManagedLedgerException exception, Object ctx) {
-                                        if (log.isDebugEnabled()) {
-                                            log.error("[{}] Transaction 
pending ack store mark delete position : "
-                                                            + "[{}] fail!", 
managedLedger.getName(),
-                                                    finalDeletePosition, 
exception);
-                                        }
-                                    }
-                                }, null);
-                    }
-                }
+                clearUselessLogData();
             }
 
             @Override
@@ -292,6 +250,68 @@ public class MLPendingAckStore implements PendingAckStore {
         return completableFuture;
     }
 
+    private void handleMetadataEntry(PositionImpl logPosition, 
PendingAckMetadataEntry pendingAckMetadataEntry) {
+        // store the persistent position in to memory
+        // store the max position of this entry retain
+        if (pendingAckMetadataEntry.getPendingAckOp() != PendingAckOp.ABORT
+                && pendingAckMetadataEntry.getPendingAckOp() != 
PendingAckOp.COMMIT) {
+            Optional<PendingAckMetadata> optional = 
pendingAckMetadataEntry.getPendingAckMetadatasList()
+                    .stream().max((o1, o2) -> 
ComparisonChain.start().compare(o1.getLedgerId(),
+                            o2.getLedgerId()).compare(o1.getEntryId(), 
o2.getEntryId()).result());
+
+            optional.ifPresent(pendingAckMetadata -> {
+                PositionImpl nowPosition = 
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+                        pendingAckMetadata.getEntryId());
+
+                if (nowPosition.compareTo(maxAckPosition) > 0) {
+                    maxAckPosition = nowPosition;
+                }
+                if (currentIndexLag.get() >= maxIndexLag) {
+                    pendingAckLogIndex.compute(maxAckPosition,
+                            (thisPosition, otherPosition) -> logPosition);
+                    maxIndexLag = 
logIndexBackoff.next(pendingAckLogIndex.size());
+                    currentIndexLag.set(0);
+                }
+            });
+        }
+    }
+
+    private void clearUselessLogData() {
+        if (!pendingAckLogIndex.isEmpty()) {
+            PositionImpl deletePosition = null;
+            while (!pendingAckLogIndex.isEmpty()
+                    && pendingAckLogIndex.firstKey() != null
+                    && subManagedCursor.getPersistentMarkDeletedPosition() != 
null
+                    && pendingAckLogIndex.firstEntry().getKey()
+                    .compareTo((PositionImpl) 
subManagedCursor.getPersistentMarkDeletedPosition()) <= 0) {
+                deletePosition = 
pendingAckLogIndex.remove(pendingAckLogIndex.firstKey());
+            }
+
+            if (deletePosition != null) {
+                maxIndexLag = logIndexBackoff.next(pendingAckLogIndex.size());
+                PositionImpl finalDeletePosition = deletePosition;
+                cursor.asyncMarkDelete(deletePosition,
+                        new AsyncCallbacks.MarkDeleteCallback() {
+                            @Override
+                            public void markDeleteComplete(Object ctx) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("[{}] Transaction pending ack 
store mark delete position : "
+                                                    + "[{}] success", 
managedLedger.getName(),
+                                            finalDeletePosition);
+                                }
+                            }
+
+                            @Override
+                            public void 
markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+                                log.error("[{}] Transaction pending ack store 
mark delete position : "
+                                                + "[{}] fail!", 
managedLedger.getName(),
+                                        finalDeletePosition, exception);
+                            }
+                        }, null);
+            }
+        }
+    }
+
     class PendingAckReplay implements Runnable {
 
         private final FillEntryQueueCallback fillEntryQueueCallback;
@@ -319,30 +339,12 @@ public class MLPendingAckStore implements PendingAckStore 
{
                         currentLoadPosition = 
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                         PendingAckMetadataEntry pendingAckMetadataEntry = new 
PendingAckMetadataEntry();
                         pendingAckMetadataEntry.parseFrom(buffer, 
buffer.readableBytes());
-                        // store the persistent position in to memory
-                        // store the max position of this entry retain
-                        if (pendingAckMetadataEntry.getPendingAckOp() != 
PendingAckOp.ABORT
-                                && pendingAckMetadataEntry.getPendingAckOp() 
!= PendingAckOp.COMMIT) {
-                            Optional<PendingAckMetadata> optional = 
pendingAckMetadataEntry.getPendingAckMetadatasList()
-                                    .stream().max((o1, o2) -> 
ComparisonChain.start().compare(o1.getLedgerId(),
-                                            
o2.getLedgerId()).compare(o1.getEntryId(), o2.getEntryId()).result());
-
-                            optional.ifPresent(pendingAckMetadata ->
-                                    
metadataPositions.compute(PositionImpl.get(entry.getLedgerId(), 
entry.getEntryId()),
-                                            (thisPosition, otherPosition) -> {
-                                                PositionImpl nowPosition = 
PositionImpl
-                                                        
.get(pendingAckMetadata.getLedgerId(),
-                                                                
pendingAckMetadata.getEntryId());
-                                                if (otherPosition == null) {
-                                                    return nowPosition;
-                                                } else {
-                                                    return 
nowPosition.compareTo(otherPosition) > 0 ? nowPosition
-                                                            : otherPosition;
-                                                }
-                                            }));
-                        }
+                        currentIndexLag.incrementAndGet();
+                        handleMetadataEntry(new 
PositionImpl(entry.getLedgerId(), entry.getEntryId()),
+                                pendingAckMetadataEntry);
                         
pendingAckReplyCallBack.handleMetadataEntry(pendingAckMetadataEntry);
                         entry.release();
+                        clearUselessLogData();
                     } else {
                         try {
                             Thread.sleep(1);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index 5417caec3d7..6b84d6e329a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -74,9 +74,14 @@ public class MLPendingAckStoreProvider implements 
TransactionPendingAckStoreProv
                                                 InitialPosition.Earliest, new 
AsyncCallbacks.OpenCursorCallback() {
                                                     @Override
                                                     public void 
openCursorComplete(ManagedCursor cursor, Object ctx) {
-                                                        pendingAckStoreFuture
-                                                                .complete(new 
MLPendingAckStore(ledger, cursor,
-                                                                        
subscription.getCursor()));
+                                                        
pendingAckStoreFuture.complete(new MLPendingAckStore(ledger,
+                                                                cursor,
+                                                                
subscription.getCursor(),
+                                                                
originPersistentTopic
+                                                                        
.getBrokerService()
+                                                                        
.getPulsar()
+                                                                        
.getConfiguration()
+                                                                        
.getTransactionPendingAckLogIndexMinLag()));
                                                         if 
(log.isDebugEnabled()) {
                                                             log.debug("{},{} 
open MLPendingAckStore cursor success",
                                                                     
originPersistentTopic.getName(),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/LogIndexLagBackoff.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/LogIndexLagBackoff.java
new file mode 100644
index 00000000000..145381814ba
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/LogIndexLagBackoff.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import lombok.Getter;
+
+public class LogIndexLagBackoff {
+
+    @Getter
+    private final long minLag;
+    @Getter
+    private final long maxLag;
+    @Getter
+    private final double exponent;
+
+    public LogIndexLagBackoff(long minLag, long maxLag, double exponent) {
+        checkArgument(minLag > 0, "min lag must be > 0");
+        checkArgument(maxLag >= minLag, "maxLag should be >= minLag");
+        checkArgument(exponent > 0, "exponent must be > 0");
+        this.minLag = minLag;
+        this.maxLag = maxLag;
+        this.exponent = exponent;
+    }
+
+
+    public long next(int indexCount) {
+        if (indexCount <= 0) {
+            return minLag;
+        }
+        return (long) Math.min(this.maxLag, minLag * Math.pow(indexCount, 
exponent));
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/package-info.java
new file mode 100644
index 00000000000..58cb1c24b19
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of a transaction tools.
+ */
+package org.apache.pulsar.broker.transaction.util;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index f264289d696..0b59eda4523 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -620,7 +620,7 @@ public class TransactionTest extends TransactionTestBase {
 
         TransactionPendingAckStoreProvider pendingAckStoreProvider = 
mock(TransactionPendingAckStoreProvider.class);
         doReturn(CompletableFuture.completedFuture(
-                new MLPendingAckStore(persistentTopic.getManagedLedger(), 
managedCursor, null)))
+                new MLPendingAckStore(persistentTopic.getManagedLedger(), 
managedCursor, null, 500)))
                 .when(pendingAckStoreProvider).newPendingAckStore(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());
 
@@ -901,6 +901,8 @@ public class TransactionTest extends TransactionTestBase {
 
     @Test
     public void testPendingAckMarkDeletePosition() throws Exception {
+        
getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(1);
+        
getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(5);
         String topic = NAMESPACE1 + "/test1";
 
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
index c99eee62463..14dbcdb8897 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java
@@ -72,7 +72,7 @@ public class PendingAckMetadataTest extends 
MockedBookKeeperTestCase {
         ManagedCursor cursor = completableFuture.get().openCursor("test");
         ManagedCursor subCursor = completableFuture.get().openCursor("test");
         MLPendingAckStore pendingAckStore =
-                new MLPendingAckStore(completableFuture.get(), cursor, 
subCursor);
+                new MLPendingAckStore(completableFuture.get(), cursor, 
subCursor, 500);
 
         Field field = 
MLPendingAckStore.class.getDeclaredField("managedLedger");
         field.setAccessible(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 196d9bbebaa..a64707f1ae8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
@@ -48,6 +49,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -57,6 +59,7 @@ import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -200,6 +203,8 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
     @Test
     public void cumulativePendingAckReplayTest() throws Exception {
         int messageCount = 1000;
+        
getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(4
 * messageCount + 2);
+        
getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(10);
         String subName = "cumulative-test";
 
         @Cleanup
@@ -354,6 +359,106 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
         assertFalse(topics.contains(topic));
     }
 
+    @Test
+    public void testDeleteUselessLogDataWhenSubCursorMoved() throws Exception {
+        
getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(5);
+        
getPulsarServiceList().get(0).getConfiguration().setManagedLedgerDefaultMarkDeleteRateLimit(5);
+        String subName = "test-log-delete";
+        String topic = TopicName.get(TopicDomain.persistent.toString(),
+                NamespaceName.get(NAMESPACE1), "test-log-delete").toString();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .create();
+
+        for (int i = 0; i < 20; i++) {
+            producer.newMessage().send();
+        }
+        // init
+        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+        consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+
+        PersistentTopic persistentTopic = (PersistentTopic) 
getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(topic, false).get().get();
+
+        PersistentSubscription persistentSubscription = 
persistentTopic.getSubscription(subName);
+        Field field = 
PersistentSubscription.class.getDeclaredField("pendingAckHandle");
+        field.setAccessible(true);
+        PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) 
field.get(persistentSubscription);
+        Field field1 = 
PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
+        field1.setAccessible(true);
+        PendingAckStore pendingAckStore = 
((CompletableFuture<PendingAckStore>) field1.get(pendingAckHandle)).get();
+
+        Field field3 = 
MLPendingAckStore.class.getDeclaredField("pendingAckLogIndex");
+        Field field4 = MLPendingAckStore.class.getDeclaredField("maxIndexLag");
+
+        field3.setAccessible(true);
+        field4.setAccessible(true);
+
+        ConcurrentSkipListMap<PositionImpl, PositionImpl> pendingAckLogIndex =
+                (ConcurrentSkipListMap<PositionImpl, PositionImpl>) 
field3.get(pendingAckStore);
+        long maxIndexLag = (long) field4.get(pendingAckStore);
+        Assert.assertEquals(pendingAckLogIndex.size(), 0);
+        Assert.assertEquals(maxIndexLag, 5);
+        transaction.commit().get();
+
+        Awaitility.await().untilAsserted(() ->
+                
Assert.assertEquals(persistentSubscription.getCursor().getPersistentMarkDeletedPosition().getEntryId(),
+                        ((MessageIdImpl)message.getMessageId()).getEntryId()));
+        // 7 more acks. Will find that there are still only two records in the 
map.
+        Transaction transaction1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+        Message<byte[]> message0 = null;
+        //remove previous index
+        for (int i = 0; i < 4; i++) {
+            message0 = consumer.receive(5, TimeUnit.SECONDS);
+            consumer.acknowledgeAsync(message0.getMessageId(), 
transaction1).get();
+        }
+        Assert.assertEquals(pendingAckLogIndex.size(), 1);
+        maxIndexLag = (long) field4.get(pendingAckStore);
+        Assert.assertEquals(maxIndexLag, 5);
+        //add new index
+        for (int i = 0; i < 9; i++) {
+            message0= consumer.receive(5, TimeUnit.SECONDS);
+            consumer.acknowledgeAsync(message0.getMessageId(), 
transaction1).get();
+        }
+
+        Assert.assertEquals(pendingAckLogIndex.size(), 2);
+        maxIndexLag = (long) field4.get(pendingAckStore);
+        Assert.assertEquals(maxIndexLag, 10);
+
+        transaction1.commit().get();
+        Message<byte[]> message1 = message0;
+        Awaitility.await().untilAsserted(() ->
+                
Assert.assertEquals(persistentSubscription.getCursor().getPersistentMarkDeletedPosition().getEntryId(),
+                        
((MessageIdImpl)message1.getMessageId()).getEntryId()));
+
+        Transaction transaction2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+        Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
+        consumer.acknowledgeAsync(message2.getMessageId(), transaction2).get();
+
+        Assert.assertEquals(pendingAckLogIndex.size(), 0);
+        maxIndexLag = (long) field4.get(pendingAckStore);
+        Assert.assertEquals(maxIndexLag, 5);
+    }
+
     @Test
     public void testPendingAckLowWaterMarkRemoveFirstTxn() throws Exception {
         String topic = TopicName.get(TopicDomain.persistent.toString(),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/LogIndexLagBackOffTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/LogIndexLagBackOffTest.java
new file mode 100644
index 00000000000..8d4f2c356a6
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/LogIndexLagBackOffTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import org.apache.pulsar.broker.transaction.util.LogIndexLagBackoff;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "utils")
+public class LogIndexLagBackOffTest {
+    @Test
+    public void testGenerateNextLogIndexLag() {
+        LogIndexLagBackoff logIndexLagBackoff = new LogIndexLagBackoff(1, 10, 
1);
+        Assert.assertEquals(logIndexLagBackoff.next(0), 1);
+        Assert.assertEquals(logIndexLagBackoff.next(6), 6);
+
+        Assert.assertEquals(logIndexLagBackoff.next(77), 10);
+
+        logIndexLagBackoff = new LogIndexLagBackoff(1, 10, 2);
+        Assert.assertEquals(logIndexLagBackoff.next(3), 9);
+
+        try {
+            new LogIndexLagBackoff(-1, 2, 3);
+        } catch (IllegalArgumentException e) {
+            Assert.assertEquals(e.getMessage(), "min lag must be > 0");
+        }
+        try {
+            new LogIndexLagBackoff(2, 1, 3);
+        } catch (IllegalArgumentException e) {
+            Assert.assertEquals(e.getMessage(), "maxLag should be >= minLag");
+        }
+        try {
+            new LogIndexLagBackoff(1, 1, 0.2);
+        } catch (IllegalArgumentException e) {
+            Assert.assertEquals(e.getMessage(), "exponent must be > 0");
+        }
+
+    }
+}

Reply via email to