This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 835e9b60f0a [improve][broker] Make read compacted entries support 
maxReadSizeBytes limitation (#21065)
835e9b60f0a is described below

commit 835e9b60f0a0f94aa9fa641a2a33d4719391897b
Author: Cong Zhao <[email protected]>
AuthorDate: Fri Sep 1 10:28:48 2023 +0800

    [improve][broker] Make read compacted entries support maxReadSizeBytes 
limitation (#21065)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  4 +-
 .../apache/pulsar/compaction/CompactedTopic.java   |  3 +-
 .../pulsar/compaction/CompactedTopicImpl.java      | 15 ++++++-
 .../pulsar/compaction/CompactedTopicUtils.java     | 18 ++++++--
 .../pulsar/compaction/CompactedTopicUtilsTest.java |  5 ++-
 .../apache/pulsar/compaction/CompactionTest.java   | 49 ++++++++++++++++++++++
 .../pulsar/compaction/StrategicCompactionTest.java | 19 +++++++++
 7 files changed, 102 insertions(+), 11 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index e2b202cce15..a2420c1c29e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -3490,7 +3490,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         return this.mbean;
     }
 
-    void updateReadStats(int readEntriesCount, long readEntriesSize) {
+    public void updateReadStats(int readEntriesCount, long readEntriesSize) {
         this.entriesReadCount += readEntriesCount;
         this.entriesReadSize += readEntriesSize;
     }
@@ -3522,7 +3522,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         }, null);
     }
 
-    private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
+    public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
         if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
             return maxEntries;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 660c7ea7797..8c17e0f3ca3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -38,7 +38,8 @@ public interface CompactedTopic {
      */
     @Deprecated
     void asyncReadEntriesOrWait(ManagedCursor cursor,
-                                int numberOfEntriesToRead,
+                                int maxEntries,
+                                long bytesToRead,
                                 boolean isFirstRead,
                                 ReadEntriesCallback callback,
                                 Consumer consumer);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index fe24a23b7cd..b028b708c49 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Consumer;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx;
@@ -93,7 +94,8 @@ public class CompactedTopicImpl implements CompactedTopic {
     @Override
     @Deprecated
     public void asyncReadEntriesOrWait(ManagedCursor cursor,
-                                       int numberOfEntriesToRead,
+                                       int maxEntries,
+                                       long bytesToRead,
                                        boolean isFirstRead,
                                        ReadEntriesCallback callback, Consumer 
consumer) {
             PositionImpl cursorPosition;
@@ -110,8 +112,11 @@ public class CompactedTopicImpl implements CompactedTopic {
 
             if (currentCompactionHorizon == null
                 || currentCompactionHorizon.compareTo(cursorPosition) < 0) {
-                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, 
readEntriesCtx, PositionImpl.LATEST);
+                cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, 
callback, readEntriesCtx, PositionImpl.LATEST);
             } else {
+                ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
+                int numberOfEntriesToRead = 
managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
+
                 compactedTopicContext.thenCompose(
                     (context) -> findStartPoint(cursorPosition, 
context.ledger.getLastAddConfirmed(), context.cache)
                         .thenCompose((startPoint) -> {
@@ -126,6 +131,12 @@ public class CompactedTopicImpl implements CompactedTopic {
                                                          startPoint + 
(numberOfEntriesToRead - 1));
                                 return readEntries(context.ledger, startPoint, 
endPoint)
                                     .thenAccept((entries) -> {
+                                        long entriesSize = 0;
+                                        for (Entry entry : entries) {
+                                            entriesSize += entry.getLength();
+                                        }
+                                        
managedCursor.updateReadStats(entries.size(), entriesSize);
+
                                         Entry lastEntry = 
entries.get(entries.size() - 1);
                                         // The compaction task depends on the 
last snapshot and the incremental
                                         // entries to build the new snapshot. 
So for the compaction cursor, we
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
index 6acd33279fd..66bcf4c3002 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.broker.service.Consumer;
@@ -40,13 +41,13 @@ public class CompactedTopicUtils {
 
     @Beta
     public static void asyncReadCompactedEntries(TopicCompactionService 
topicCompactionService,
-                                                 ManagedCursor cursor, int 
numberOfEntriesToRead,
+                                                 ManagedCursor cursor, int 
maxEntries,
                                                  long bytesToRead, boolean 
readFromEarliest,
                                                  
AsyncCallbacks.ReadEntriesCallback callback,
                                                  boolean wait, @Nullable 
Consumer consumer) {
         Objects.requireNonNull(topicCompactionService);
         Objects.requireNonNull(cursor);
-        checkArgument(numberOfEntriesToRead > 0);
+        checkArgument(maxEntries > 0);
         Objects.requireNonNull(callback);
 
         final PositionImpl readPosition;
@@ -67,15 +68,18 @@ public class CompactedTopicUtils {
                     || readPosition.compareTo(
                     lastCompactedPosition.getLedgerId(), 
lastCompactedPosition.getEntryId()) > 0) {
                 if (wait) {
-                    cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, 
bytesToRead, callback, readEntriesCtx,
+                    cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, 
callback, readEntriesCtx,
                         PositionImpl.LATEST);
                 } else {
-                    cursor.asyncReadEntries(numberOfEntriesToRead, 
bytesToRead, callback, readEntriesCtx,
+                    cursor.asyncReadEntries(maxEntries, bytesToRead, callback, 
readEntriesCtx,
                         PositionImpl.LATEST);
                 }
                 return CompletableFuture.completedFuture(null);
             }
 
+            ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
+            int numberOfEntriesToRead = 
managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
+
             return topicCompactionService.readCompactedEntries(readPosition, 
numberOfEntriesToRead)
                     .thenAccept(entries -> {
                         if (CollectionUtils.isEmpty(entries)) {
@@ -88,6 +92,12 @@ public class CompactedTopicUtils {
                             return;
                         }
 
+                        long entriesSize = 0;
+                        for (Entry entry : entries) {
+                            entriesSize += entry.getLength();
+                        }
+                        managedCursor.updateReadStats(entries.size(), 
entriesSize);
+
                         Entry lastEntry = entries.get(entries.size() - 1);
                         cursor.seek(lastEntry.getPosition().getNext(), true);
                         callback.readEntriesComplete(entries, readEntriesCtx);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
index 329abf9f780..94f2a17a2a3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
@@ -25,8 +25,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -46,8 +46,9 @@ public class CompactedTopicUtilsTest {
 
         PositionImpl initPosition = PositionImpl.get(1, 90);
         AtomicReference<PositionImpl> readPositionRef = new 
AtomicReference<>(initPosition.getNext());
-        ManagedCursor cursor = Mockito.mock(ManagedCursor.class);
+        ManagedCursorImpl cursor = Mockito.mock(ManagedCursorImpl.class);
         Mockito.doReturn(readPositionRef.get()).when(cursor).getReadPosition();
+        Mockito.doReturn(1).when(cursor).applyMaxSizeCap(Mockito.anyInt(), 
Mockito.anyLong());
         Mockito.doAnswer(invocation -> {
             readPositionRef.set(invocation.getArgument(0));
             return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index c5dbd9c49aa..afbbe6101f8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -90,6 +91,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -1877,4 +1879,51 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testDispatcherMaxReadSizeBytes() throws Exception {
+        final String topicName =
+                
"persistent://my-property/use/my-ns/testDispatcherMaxReadSizeBytes" + 
UUID.randomUUID();
+        final String subName = "my-sub";
+        final int receiveQueueSize = 1;
+        @Cleanup
+        PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topicName).create();
+
+        for (int i = 0; i < 10; i+=2) {
+            producer.newMessage().key(null).value(new 
byte[4*1024*1024]).send();
+        }
+        producer.flush();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        admin.topics().unload(topicName);
+
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
client.newConsumer(Schema.BYTES)
+                
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+                .subscribe();
+
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        TopicCompactionService topicCompactionService = 
Mockito.spy(topic.getTopicCompactionService());
+        FieldUtils.writeDeclaredField(topic, "topicCompactionService", 
topicCompactionService, true);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(consumer.getStats().getMsgNumInReceiverQueue(),
+                    1);
+        });
+
+        consumer.increaseAvailablePermits(2);
+
+        Mockito.verify(topicCompactionService, 
Mockito.times(1)).readCompactedEntries(Mockito.any(), Mockito.same(1));
+
+        consumer.close();
+        producer.close();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
index 135a839bd54..799c2703e1e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
@@ -148,5 +148,24 @@ public class StrategicCompactionTest extends 
CompactionTest {
         Assert.assertEquals(tableView.entrySet(), expectedCopy.entrySet());
     }
 
+    @Override
+    public void testCompactCompressedBatching() throws Exception {
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler, 10);
+        super.testCompactCompressedBatching();
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler, 1);
+    }
+
+    @Override
+    public void testCompactEncryptedAndCompressedBatching() throws Exception {
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler, 10);
+        super.testCompactEncryptedAndCompressedBatching();
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler, 1);
+    }
 
+    @Override
+    public void testCompactEncryptedBatching() throws Exception {
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler, 10);
+        super.testCompactEncryptedBatching();
+        compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler, 1);
+    }
 }

Reply via email to