This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 835e9b60f0a [improve][broker] Make read compacted entries support
maxReadSizeBytes limitation (#21065)
835e9b60f0a is described below
commit 835e9b60f0a0f94aa9fa641a2a33d4719391897b
Author: Cong Zhao <[email protected]>
AuthorDate: Fri Sep 1 10:28:48 2023 +0800
[improve][broker] Make read compacted entries support maxReadSizeBytes
limitation (#21065)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 4 +-
.../apache/pulsar/compaction/CompactedTopic.java | 3 +-
.../pulsar/compaction/CompactedTopicImpl.java | 15 ++++++-
.../pulsar/compaction/CompactedTopicUtils.java | 18 ++++++--
.../pulsar/compaction/CompactedTopicUtilsTest.java | 5 ++-
.../apache/pulsar/compaction/CompactionTest.java | 49 ++++++++++++++++++++++
.../pulsar/compaction/StrategicCompactionTest.java | 19 +++++++++
7 files changed, 102 insertions(+), 11 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index e2b202cce15..a2420c1c29e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -3490,7 +3490,7 @@ public class ManagedCursorImpl implements ManagedCursor {
return this.mbean;
}
- void updateReadStats(int readEntriesCount, long readEntriesSize) {
+ public void updateReadStats(int readEntriesCount, long readEntriesSize) {
this.entriesReadCount += readEntriesCount;
this.entriesReadSize += readEntriesSize;
}
@@ -3522,7 +3522,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}, null);
}
- private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
+ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 660c7ea7797..8c17e0f3ca3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -38,7 +38,8 @@ public interface CompactedTopic {
*/
@Deprecated
void asyncReadEntriesOrWait(ManagedCursor cursor,
- int numberOfEntriesToRead,
+ int maxEntries,
+ long bytesToRead,
boolean isFirstRead,
ReadEntriesCallback callback,
Consumer consumer);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index fe24a23b7cd..b028b708c49 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx;
@@ -93,7 +94,8 @@ public class CompactedTopicImpl implements CompactedTopic {
@Override
@Deprecated
public void asyncReadEntriesOrWait(ManagedCursor cursor,
- int numberOfEntriesToRead,
+ int maxEntries,
+ long bytesToRead,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer
consumer) {
PositionImpl cursorPosition;
@@ -110,8 +112,11 @@ public class CompactedTopicImpl implements CompactedTopic {
if (currentCompactionHorizon == null
|| currentCompactionHorizon.compareTo(cursorPosition) < 0) {
- cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback,
readEntriesCtx, PositionImpl.LATEST);
+ cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead,
callback, readEntriesCtx, PositionImpl.LATEST);
} else {
+ ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
+ int numberOfEntriesToRead =
managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
+
compactedTopicContext.thenCompose(
(context) -> findStartPoint(cursorPosition,
context.ledger.getLastAddConfirmed(), context.cache)
.thenCompose((startPoint) -> {
@@ -126,6 +131,12 @@ public class CompactedTopicImpl implements CompactedTopic {
startPoint +
(numberOfEntriesToRead - 1));
return readEntries(context.ledger, startPoint,
endPoint)
.thenAccept((entries) -> {
+ long entriesSize = 0;
+ for (Entry entry : entries) {
+ entriesSize += entry.getLength();
+ }
+
managedCursor.updateReadStats(entries.size(), entriesSize);
+
Entry lastEntry =
entries.get(entries.size() - 1);
// The compaction task depends on the
last snapshot and the incremental
// entries to build the new snapshot.
So for the compaction cursor, we
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
index 6acd33279fd..66bcf4c3002 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.service.Consumer;
@@ -40,13 +41,13 @@ public class CompactedTopicUtils {
@Beta
public static void asyncReadCompactedEntries(TopicCompactionService
topicCompactionService,
- ManagedCursor cursor, int
numberOfEntriesToRead,
+ ManagedCursor cursor, int
maxEntries,
long bytesToRead, boolean
readFromEarliest,
AsyncCallbacks.ReadEntriesCallback callback,
boolean wait, @Nullable
Consumer consumer) {
Objects.requireNonNull(topicCompactionService);
Objects.requireNonNull(cursor);
- checkArgument(numberOfEntriesToRead > 0);
+ checkArgument(maxEntries > 0);
Objects.requireNonNull(callback);
final PositionImpl readPosition;
@@ -67,15 +68,18 @@ public class CompactedTopicUtils {
|| readPosition.compareTo(
lastCompactedPosition.getLedgerId(),
lastCompactedPosition.getEntryId()) > 0) {
if (wait) {
- cursor.asyncReadEntriesOrWait(numberOfEntriesToRead,
bytesToRead, callback, readEntriesCtx,
+ cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead,
callback, readEntriesCtx,
PositionImpl.LATEST);
} else {
- cursor.asyncReadEntries(numberOfEntriesToRead,
bytesToRead, callback, readEntriesCtx,
+ cursor.asyncReadEntries(maxEntries, bytesToRead, callback,
readEntriesCtx,
PositionImpl.LATEST);
}
return CompletableFuture.completedFuture(null);
}
+ ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
+ int numberOfEntriesToRead =
managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
+
return topicCompactionService.readCompactedEntries(readPosition,
numberOfEntriesToRead)
.thenAccept(entries -> {
if (CollectionUtils.isEmpty(entries)) {
@@ -88,6 +92,12 @@ public class CompactedTopicUtils {
return;
}
+ long entriesSize = 0;
+ for (Entry entry : entries) {
+ entriesSize += entry.getLength();
+ }
+ managedCursor.updateReadStats(entries.size(),
entriesSize);
+
Entry lastEntry = entries.get(entries.size() - 1);
cursor.seek(lastEntry.getPosition().getNext(), true);
callback.readEntriesComplete(entries, readEntriesCtx);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
index 329abf9f780..94f2a17a2a3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
@@ -25,8 +25,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -46,8 +46,9 @@ public class CompactedTopicUtilsTest {
PositionImpl initPosition = PositionImpl.get(1, 90);
AtomicReference<PositionImpl> readPositionRef = new
AtomicReference<>(initPosition.getNext());
- ManagedCursor cursor = Mockito.mock(ManagedCursor.class);
+ ManagedCursorImpl cursor = Mockito.mock(ManagedCursorImpl.class);
Mockito.doReturn(readPositionRef.get()).when(cursor).getReadPosition();
+ Mockito.doReturn(1).when(cursor).applyMaxSizeCap(Mockito.anyInt(),
Mockito.anyLong());
Mockito.doAnswer(invocation -> {
readPositionRef.set(invocation.getArgument(0));
return null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index c5dbd9c49aa..afbbe6101f8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -90,6 +91,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -1877,4 +1879,51 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
consumer.close();
producer.close();
}
+
+ @Test
+ public void testDispatcherMaxReadSizeBytes() throws Exception {
+ final String topicName =
+
"persistent://my-property/use/my-ns/testDispatcherMaxReadSizeBytes" +
UUID.randomUUID();
+ final String subName = "my-sub";
+ final int receiveQueueSize = 1;
+ @Cleanup
+ PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topicName).create();
+
+ for (int i = 0; i < 10; i+=2) {
+ producer.newMessage().key(null).value(new
byte[4*1024*1024]).send();
+ }
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ admin.topics().unload(topicName);
+
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
client.newConsumer(Schema.BYTES)
+
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+ .subscribe();
+
+
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ TopicCompactionService topicCompactionService =
Mockito.spy(topic.getTopicCompactionService());
+ FieldUtils.writeDeclaredField(topic, "topicCompactionService",
topicCompactionService, true);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(consumer.getStats().getMsgNumInReceiverQueue(),
+ 1);
+ });
+
+ consumer.increaseAvailablePermits(2);
+
+ Mockito.verify(topicCompactionService,
Mockito.times(1)).readCompactedEntries(Mockito.any(), Mockito.same(1));
+
+ consumer.close();
+ producer.close();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
index 135a839bd54..799c2703e1e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
@@ -148,5 +148,24 @@ public class StrategicCompactionTest extends
CompactionTest {
Assert.assertEquals(tableView.entrySet(), expectedCopy.entrySet());
}
+ @Override
+ public void testCompactCompressedBatching() throws Exception {
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 10);
+ super.testCompactCompressedBatching();
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 1);
+ }
+
+ @Override
+ public void testCompactEncryptedAndCompressedBatching() throws Exception {
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 10);
+ super.testCompactEncryptedAndCompressedBatching();
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 1);
+ }
+ @Override
+ public void testCompactEncryptedBatching() throws Exception {
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 10);
+ super.testCompactEncryptedBatching();
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 1);
+ }
}