This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e59c850753f [fix][broker] Avoid splitting one batch message into two
entries in StrategicTwoPhaseCompactor (#21091)
e59c850753f is described below
commit e59c850753f2a1d08e4df63bf8862c7cb5db4e71
Author: Kai Wang <[email protected]>
AuthorDate: Mon Sep 4 14:13:06 2023 +0800
[fix][broker] Avoid splitting one batch message into two entries in
StrategicTwoPhaseCompactor (#21091)
---
.../client/impl/RawBatchMessageContainerImpl.java | 41 ++++++++--
.../compaction/StrategicTwoPhaseCompactor.java | 90 ++++++++++------------
.../impl/RawBatchMessageContainerImplTest.java | 53 +++++++------
.../apache/pulsar/compaction/CompactionTest.java | 11 +--
.../StrategicCompactionRetentionTest.java | 2 +-
.../pulsar/compaction/StrategicCompactionTest.java | 81 ++++++++++++++-----
.../pulsar/compaction/StrategicCompactorTest.java | 4 +-
7 files changed, 170 insertions(+), 112 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
index 7e1c2cd5e3f..ba8d3db7178 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -44,17 +45,17 @@ import org.apache.pulsar.common.protocol.Commands;
* [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2,
v3), (k3, v3)]
*/
public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl {
- MessageCrypto msgCrypto;
- Set<String> encryptionKeys;
- CryptoKeyReader cryptoKeyReader;
+ private MessageCrypto<MessageMetadata, MessageMetadata> msgCrypto;
+ private Set<String> encryptionKeys;
+ private CryptoKeyReader cryptoKeyReader;
+ private MessageIdAdv lastAddedMessageId;
- public RawBatchMessageContainerImpl(int maxNumMessagesInBatch, int
maxBytesInBatch) {
+ public RawBatchMessageContainerImpl() {
super();
this.compressionType = CompressionType.NONE;
this.compressor = new CompressionCodecNone();
- this.maxNumMessagesInBatch = maxNumMessagesInBatch;
- this.maxBytesInBatch = maxBytesInBatch;
}
+
private ByteBuf encrypt(ByteBuf compressedPayload) {
if (msgCrypto == null) {
return compressedPayload;
@@ -90,6 +91,28 @@ public class RawBatchMessageContainerImpl extends
BatchMessageContainerImpl {
this.cryptoKeyReader = cryptoKeyReader;
}
+ @Override
+ public boolean add(MessageImpl<?> msg, SendCallback callback) {
+ this.lastAddedMessageId = (MessageIdAdv) msg.getMessageId();
+ return super.add(msg, callback);
+ }
+
+ @Override
+ protected boolean isBatchFull() {
+ return false;
+ }
+
+ @Override
+ public boolean haveEnoughSpace(MessageImpl<?> msg) {
+ if (lastAddedMessageId == null) {
+ return true;
+ }
+ // Keep same batch compact to same batch.
+ MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
+ return msgId.getLedgerId() == lastAddedMessageId.getLedgerId()
+ && msgId.getEntryId() == lastAddedMessageId.getEntryId();
+ }
+
/**
* Serializes the batched messages and return the ByteBuf.
* It sets the CompressionType and Encryption Keys from the batched
messages.
@@ -168,4 +191,10 @@ public class RawBatchMessageContainerImpl extends
BatchMessageContainerImpl {
clear();
return buf;
}
+
+ @Override
+ public void clear() {
+ this.lastAddedMessageId = null;
+ super.clear();
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index a6b09427427..fefa2ee959c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.compaction;
-import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.time.Duration;
import java.util.Iterator;
@@ -63,39 +62,19 @@ import org.slf4j.LoggerFactory;
public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
private static final Logger log =
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
- private static final int MAX_NUM_MESSAGES_IN_BATCH = 1000;
- private static final int MAX_BYTES_IN_BATCH = 128 * 1024;
private static final int MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS = 20
* 1000;
private final Duration phaseOneLoopReadTimeout;
private final RawBatchMessageContainerImpl batchMessageContainer;
- @VisibleForTesting
public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
- ScheduledExecutorService scheduler,
- int maxNumMessagesInBatch) {
- this(conf, pulsar, bk, scheduler, maxNumMessagesInBatch,
MAX_BYTES_IN_BATCH);
- }
-
- private StrategicTwoPhaseCompactor(ServiceConfiguration conf,
- PulsarClient pulsar,
- BookKeeper bk,
- ScheduledExecutorService scheduler,
- int maxNumMessagesInBatch,
- int maxBytesInBatch) {
+ ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
- batchMessageContainer = new
RawBatchMessageContainerImpl(maxNumMessagesInBatch, maxBytesInBatch);
+ batchMessageContainer = new RawBatchMessageContainerImpl();
phaseOneLoopReadTimeout =
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
}
- public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
- PulsarClient pulsar,
- BookKeeper bk,
- ScheduledExecutorService scheduler) {
- this(conf, pulsar, bk, scheduler, MAX_NUM_MESSAGES_IN_BATCH,
MAX_BYTES_IN_BATCH);
- }
-
public CompletableFuture<Long> compact(String topic) {
throw new UnsupportedOperationException();
}
@@ -418,7 +397,6 @@ public class StrategicTwoPhaseCompactor extends
TwoPhaseCompactor {
.whenComplete((res, exception2) -> {
if (exception2 != null) {
promise.completeExceptionally(exception2);
- return;
}
});
phaseTwoLoop(topic, reader, lh, outstanding, promise);
@@ -443,35 +421,45 @@ public class StrategicTwoPhaseCompactor extends
TwoPhaseCompactor {
<T> CompletableFuture<Boolean> addToCompactedLedger(
LedgerHandle lh, Message<T> m, String topic, Semaphore
outstanding) {
+ if (m == null) {
+ return flushBatchMessage(lh, topic, outstanding);
+ }
+ if (batchMessageContainer.haveEnoughSpace((MessageImpl<?>) m)) {
+ batchMessageContainer.add((MessageImpl<?>) m, null);
+ return CompletableFuture.completedFuture(false);
+ }
+ CompletableFuture<Boolean> f = flushBatchMessage(lh, topic,
outstanding);
+ batchMessageContainer.add((MessageImpl<?>) m, null);
+ return f;
+ }
+
+ private CompletableFuture<Boolean> flushBatchMessage(LedgerHandle lh,
String topic,
+ Semaphore
outstanding) {
+ if (batchMessageContainer.getNumMessagesInBatch() <= 0) {
+ return CompletableFuture.completedFuture(false);
+ }
CompletableFuture<Boolean> bkf = new CompletableFuture<>();
- if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) {
- if (batchMessageContainer.getNumMessagesInBatch() > 0) {
- try {
- ByteBuf serialized = batchMessageContainer.toByteBuf();
- outstanding.acquire();
- mxBean.addCompactionWriteOp(topic,
serialized.readableBytes());
- long start = System.nanoTime();
- lh.asyncAddEntry(serialized,
- (rc, ledger, eid, ctx) -> {
- outstanding.release();
- mxBean.addCompactionLatencyOp(topic,
System.nanoTime() - start, TimeUnit.NANOSECONDS);
- if (rc != BKException.Code.OK) {
-
bkf.completeExceptionally(BKException.create(rc));
- } else {
- bkf.complete(true);
- }
- }, null);
+ try {
+ ByteBuf serialized = batchMessageContainer.toByteBuf();
+ outstanding.acquire();
+ mxBean.addCompactionWriteOp(topic, serialized.readableBytes());
+ long start = System.nanoTime();
+ lh.asyncAddEntry(serialized,
+ (rc, ledger, eid, ctx) -> {
+ outstanding.release();
+ mxBean.addCompactionLatencyOp(topic, System.nanoTime()
- start, TimeUnit.NANOSECONDS);
+ if (rc != BKException.Code.OK) {
+ bkf.completeExceptionally(BKException.create(rc));
+ } else {
+ bkf.complete(true);
+ }
+ }, null);
- } catch (Throwable t) {
- log.error("Failed to add entry", t);
- batchMessageContainer.discard((Exception) t);
- return FutureUtil.failedFuture(t);
- }
- } else {
- bkf.complete(false);
- }
- } else {
- bkf.complete(false);
+ } catch (Throwable t) {
+ log.error("Failed to add entry", t);
+ batchMessageContainer.discard((Exception) t);
+ bkf.completeExceptionally(t);
+ return bkf;
}
return bkf;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
index 9b8b1e5efb9..d79a31c07f2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
@@ -47,7 +47,6 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.compaction.CompactionTest;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class RawBatchMessageContainerImplTest {
@@ -56,8 +55,6 @@ public class RawBatchMessageContainerImplTest {
CryptoKeyReader cryptoKeyReader;
Map<String, EncryptionContext.EncryptionKey> encryptKeys;
- int maxBytesInBatch = 5 * 1024 * 1024;
-
public void setEncryptionAndCompression(boolean encrypt, boolean compress)
{
if (compress) {
compressionType = ZSTD;
@@ -107,22 +104,22 @@ public class RawBatchMessageContainerImplTest {
public void setup() throws Exception {
setEncryptionAndCompression(false, true);
}
- @DataProvider(name = "testBatchLimitByMessageCount")
- public static Object[][] testBatchLimitByMessageCount() {
- return new Object[][] {{true}, {false}};
- }
-
- @Test(timeOut = 20000, dataProvider = "testBatchLimitByMessageCount")
- public void testToByteBufWithBatchLimit(boolean
testBatchLimitByMessageCount) throws IOException {
- RawBatchMessageContainerImpl container = testBatchLimitByMessageCount ?
- new RawBatchMessageContainerImpl(2, Integer.MAX_VALUE) :
- new RawBatchMessageContainerImpl(Integer.MAX_VALUE, 5);
+ @Test(timeOut = 20000)
+ public void testToByteBufWithBatchLimit()throws IOException {
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl();
String topic = "my-topic";
- var full1 = container.add(createMessage(topic, "hi-1", 0), null);
- var full2 = container.add(createMessage(topic, "hi-2", 1), null);
+ MessageImpl message1 = createMessage(topic, "hi-1", 0);
+ boolean hasEnoughSpase1 = container.haveEnoughSpace(message1);
+ var full1 = container.add(message1, null);
assertFalse(full1);
- assertTrue(full2);
+ assertTrue(hasEnoughSpase1);
+ MessageImpl message2 = createMessage(topic, "hi-2", 1);
+ boolean hasEnoughSpase2 = container.haveEnoughSpace(message2);
+ assertFalse(hasEnoughSpase2);
+ var full2 = container.add(message2, null);
+ assertFalse(full2);
+
ByteBuf buf = container.toByteBuf();
@@ -167,7 +164,7 @@ public class RawBatchMessageContainerImplTest {
public void testToByteBufWithCompressionAndEncryption() throws IOException
{
setEncryptionAndCompression(true, true);
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(2, maxBytesInBatch);
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl();
container.setCryptoKeyReader(cryptoKeyReader);
String topic = "my-topic";
container.add(createMessage(topic, "hi-1", 0), null);
@@ -217,7 +214,7 @@ public class RawBatchMessageContainerImplTest {
@Test
public void testToByteBufWithSingleMessage() throws IOException {
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(2, maxBytesInBatch);
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl();
String topic = "my-topic";
container.add(createMessage(topic, "hi-1", 0), null);
ByteBuf buf = container.toByteBuf();
@@ -250,25 +247,31 @@ public class RawBatchMessageContainerImplTest {
}
@Test
- public void testMaxNumMessagesInBatch() {
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1, maxBytesInBatch);
+ public void testAddDifferentBatchMessage() {
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl();
String topic = "my-topic";
boolean isFull = container.add(createMessage(topic, "hi", 0), null);
- Assert.assertTrue(isFull);
- Assert.assertTrue(container.isBatchFull());
+ Assert.assertFalse(isFull);
+ Assert.assertFalse(container.isBatchFull());
+ MessageImpl message = createMessage(topic, "hi-1", 0);
+ Assert.assertTrue(container.haveEnoughSpace(message));
+ isFull = container.add(message, null);
+ Assert.assertFalse(isFull);
+ message = createMessage(topic, "hi-2", 1);
+ Assert.assertFalse(container.haveEnoughSpace(message));
}
@Test(expectedExceptions = UnsupportedOperationException.class)
public void testCreateOpSendMsg() {
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1, maxBytesInBatch);
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl();
container.createOpSendMsg();
}
@Test
public void testToByteBufWithEncryptionWithoutCryptoKeyReader() {
setEncryptionAndCompression(true, false);
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1, maxBytesInBatch);
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl();
String topic = "my-topic";
container.add(createMessage(topic, "hi-1", 0), null);
Assert.assertEquals(container.getNumMessagesInBatch(), 1);
@@ -286,7 +289,7 @@ public class RawBatchMessageContainerImplTest {
@Test
public void testToByteBufWithEncryptionWithInvalidEncryptKeys() {
setEncryptionAndCompression(true, false);
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1, maxBytesInBatch);
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl();
container.setCryptoKeyReader(cryptoKeyReader);
encryptKeys = new HashMap<>();
encryptKeys.put(null, null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index afbbe6101f8..5d8c1a8dc1a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -81,7 +81,6 @@ import
org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -542,14 +541,8 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(message2.getKey(), "key2");
Assert.assertEquals(new String(message2.getData()),
"my-message-3");
if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
- MessageIdImpl id = (MessageIdImpl)
messages.get(0).getMessageId();
- MessageIdImpl id1 = new MessageIdImpl(
- id.getLedgerId(), id.getEntryId(),
id.getPartitionIndex());
- Assert.assertEquals(message1.getMessageId(), id1);
- id = (MessageIdImpl) messages.get(2).getMessageId();
- MessageIdImpl id2 = new MessageIdImpl(
- id.getLedgerId(), id.getEntryId(),
id.getPartitionIndex());
- Assert.assertEquals(message2.getMessageId(), id2);
+ Assert.assertEquals(message1.getMessageId(),
messages.get(0).getMessageId());
+ Assert.assertEquals(message2.getMessageId(),
messages.get(1).getMessageId());
} else {
Assert.assertEquals(message1.getMessageId(),
messages.get(0).getMessageId());
Assert.assertEquals(message2.getMessageId(),
messages.get(2).getMessageId());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
index 1cac04c2fa9..e556ec8e0b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
@@ -34,7 +34,7 @@ public class StrategicCompactionRetentionTest extends
CompactionRetentionTest {
@Override
public void setup() throws Exception {
super.setup();
- compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 1);
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
strategy = new
TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
index 799c2703e1e..54563431052 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
@@ -18,22 +18,33 @@
*/
package org.apache.pulsar.compaction;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE;
+import static org.testng.Assert.assertEquals;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -47,7 +58,7 @@ public class StrategicCompactionTest extends CompactionTest {
@Override
public void setup() throws Exception {
super.setup();
- compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 1);
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
strategy = new
TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
}
@@ -148,24 +159,58 @@ public class StrategicCompactionTest extends
CompactionTest {
Assert.assertEquals(tableView.entrySet(), expectedCopy.entrySet());
}
- @Override
- public void testCompactCompressedBatching() throws Exception {
- compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 10);
- super.testCompactCompressedBatching();
- compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 1);
- }
+ @Test(timeOut = 20000)
+ public void testSameBatchCompactToSameBatch() throws Exception {
+ final String topic =
+
"persistent://my-property/use/my-ns/testSameBatchCompactToSameBatch" +
UUID.randomUUID();
- @Override
- public void testCompactEncryptedAndCompressedBatching() throws Exception {
- compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 10);
- super.testCompactEncryptedAndCompressedBatching();
- compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 1);
- }
+ // Use odd number to make sure the last message is flush by
`reader.hasNext() == false`.
+ final int messages = 11;
+
+ // 1.create producer and publish message to the topic.
+ ProducerBuilder<Integer> builder =
pulsarClient.newProducer(Schema.INT32)
+ .compressionType(MSG_COMPRESSION_TYPE).topic(topic);
+ builder.batchingMaxMessages(2)
+ .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
+
+ Producer<Integer> producer = builder.create();
+
+ List<CompletableFuture<MessageId>> futures = new ArrayList<>(messages);
+ for (int i = 0; i < messages; i++) {
+ futures.add(producer.newMessage().key(String.valueOf(i))
+ .value(i)
+ .sendAsync());
+ }
+ FutureUtil.waitForAll(futures).get();
+
+ // 2.compact the topic.
+ StrategicTwoPhaseCompactor compactor
+ = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
+ compactor.compact(topic, strategy).get();
+
+ // consumer with readCompacted enabled only get compacted entries
+ try (Consumer<Integer> consumer = pulsarClient
+ .newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .readCompacted(true)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe())
{
+ int received = 0;
+ while (true) {
+ Message<Integer> m = consumer.receive(2, TimeUnit.SECONDS);
+ if (m == null) {
+ break;
+ }
+ MessageIdAdv messageId = (MessageIdAdv) m.getMessageId();
+ if (received < messages - 1) {
+ assertEquals(messageId.getBatchSize(), 2);
+ } else {
+ assertEquals(messageId.getBatchSize(), 0);
+ }
+ received++;
+ }
+ assertEquals(received, messages);
+ }
- @Override
- public void testCompactEncryptedBatching() throws Exception {
- compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 10);
- super.testCompactEncryptedBatching();
- compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 1);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
index 91dd8a2bd35..bc65791b323 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
@@ -33,7 +33,7 @@ public class StrategicCompactorTest extends CompactorTest {
@Override
public void setup() throws Exception {
super.setup();
- compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler, 1);
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk,
compactionScheduler);
strategy = new
TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
}
@@ -46,4 +46,4 @@ public class StrategicCompactorTest extends CompactorTest {
protected Compactor getCompactor() {
return compactor;
}
-}
\ No newline at end of file
+}