This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c8d2bba651b [improve][broker] PIP-192 Fix getLastMessageId for
compressed payload(And add compression and maxBatchSize for the load balance
system topic) (#20087)
c8d2bba651b is described below
commit c8d2bba651b45b9345ba3f3c4e5bcfe2468a2206
Author: Heesung Sohn <[email protected]>
AuthorDate: Thu Apr 13 10:56:22 2023 -0700
[improve][broker] PIP-192 Fix getLastMessageId for compressed payload(And
add compression and maxBatchSize for the load balance system topic) (#20087)
---
.../channel/ServiceUnitStateChannelImpl.java | 4 ++
.../pulsar/broker/service/BrokerService.java | 6 +--
.../apache/pulsar/broker/service/ServerCnx.java | 11 ++++
.../client/impl/RawBatchMessageContainerImpl.java | 12 ++---
.../compaction/StrategicTwoPhaseCompactor.java | 17 +++++-
.../impl/RawBatchMessageContainerImplTest.java | 63 +++++++++++++++-------
.../compaction/GetLastMessageIdCompactedTest.java | 43 +++++++++++++++
.../compaction/ServiceUnitStateCompactionTest.java | 24 +++++++--
8 files changed, 145 insertions(+), 35 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index bd62c53be60..68c6440e68e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -81,6 +81,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -106,6 +107,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
TopicDomain.persistent.value(),
SYSTEM_NAMESPACE,
"loadbalancer-service-unit-state").toString();
+
+ public static final CompressionType MSG_COMPRESSION_TYPE =
CompressionType.ZSTD;
private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30
* 1000; // 30sec
public static final long VERSION_ID_INIT = 1; // initial versionId
private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
@@ -285,6 +288,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
producer = pulsar.getClient().newProducer(schema)
.enableBatching(true)
+ .compressionType(MSG_COMPRESSION_TYPE)
.maxPendingMessages(MAX_OUTSTANDING_PUB_MESSAGES)
.blockIfQueueFull(true)
.topic(TOPIC)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bb08734298f..33e5500d623 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2144,9 +2144,9 @@ public class BrokerService implements Closeable {
if (ownedByThisInstance) {
return CompletableFuture.completedFuture(null);
} else {
- String msg = String.format("Namespace bundle for topic
(%s) not served by this instance. "
- + "Please redo the lookup. Request is
denied: namespace=%s", topic,
- topicName.getNamespace());
+ String msg = String.format("Namespace bundle for topic
(%s) not served by this instance:%s. "
+ + "Please redo the lookup. Request is
denied: namespace=%s",
+ topic, pulsar.getLookupServiceAddress(),
topicName.getNamespace());
log.warn(msg);
return FutureUtil.failedFuture(new
ServiceUnitNotReadyException(msg));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 01274ab4460..888668e15b1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -129,6 +129,7 @@ import
org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
+import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
@@ -141,6 +142,8 @@ import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -2167,6 +2170,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (batchSize <= 1){
return -1;
}
+ if (metadata.hasCompression()) {
+ var tmp = payload;
+ CompressionType compressionType = metadata.getCompression();
+ CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
+ int uncompressedSize = metadata.getUncompressedSize();
+ payload = codec.decode(payload, uncompressedSize);
+ tmp.release();
+ }
SingleMessageMetadata singleMessageMetadata = new
SingleMessageMetadata();
int lastBatchIndexInBatch = -1;
for (int i = 0; i < batchSize; i++){
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
index cf6b213155c..7e1c2cd5e3f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
@@ -47,13 +47,13 @@ public class RawBatchMessageContainerImpl extends
BatchMessageContainerImpl {
MessageCrypto msgCrypto;
Set<String> encryptionKeys;
CryptoKeyReader cryptoKeyReader;
- public RawBatchMessageContainerImpl(int maxNumMessagesInBatch) {
+
+ public RawBatchMessageContainerImpl(int maxNumMessagesInBatch, int
maxBytesInBatch) {
super();
- compressionType = CompressionType.NONE;
- compressor = new CompressionCodecNone();
- if (maxNumMessagesInBatch > 0) {
- this.maxNumMessagesInBatch = maxNumMessagesInBatch;
- }
+ this.compressionType = CompressionType.NONE;
+ this.compressor = new CompressionCodecNone();
+ this.maxNumMessagesInBatch = maxNumMessagesInBatch;
+ this.maxBytesInBatch = maxBytesInBatch;
}
private ByteBuf encrypt(ByteBuf compressedPayload) {
if (msgCrypto == null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index 37b03e275d6..557d4a65801 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.compaction;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.time.Duration;
import java.util.Iterator;
@@ -62,17 +63,29 @@ import org.slf4j.LoggerFactory;
public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
private static final Logger log =
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
+ private static final int MAX_NUM_MESSAGES_IN_BATCH = 1000;
+ private static final int MAX_BYTES_IN_BATCH = 128 * 1024;
private static final int MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS = 20
* 1000;
private final Duration phaseOneLoopReadTimeout;
private final RawBatchMessageContainerImpl batchMessageContainer;
+ @VisibleForTesting
public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler,
int maxNumMessagesInBatch) {
+ this(conf, pulsar, bk, scheduler, maxNumMessagesInBatch,
MAX_BYTES_IN_BATCH);
+ }
+
+ private StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler,
+ int maxNumMessagesInBatch,
+ int maxBytesInBatch) {
super(conf, pulsar, bk, scheduler);
- batchMessageContainer = new
RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+ batchMessageContainer = new
RawBatchMessageContainerImpl(maxNumMessagesInBatch, maxBytesInBatch);
phaseOneLoopReadTimeout =
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
}
@@ -80,7 +93,7 @@ public class StrategicTwoPhaseCompactor extends
TwoPhaseCompactor {
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler) {
- this(conf, pulsar, bk, scheduler, -1);
+ this(conf, pulsar, bk, scheduler, MAX_NUM_MESSAGES_IN_BATCH,
MAX_BYTES_IN_BATCH);
}
public CompletableFuture<Long> compact(String topic) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
index 9fa834a166c..9b8b1e5efb9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
@@ -19,8 +19,10 @@
package org.apache.pulsar.client.impl;
-import static org.apache.pulsar.common.api.proto.CompressionType.LZ4;
import static org.apache.pulsar.common.api.proto.CompressionType.NONE;
+import static org.apache.pulsar.common.api.proto.CompressionType.ZSTD;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
@@ -45,6 +47,7 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.compaction.CompactionTest;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class RawBatchMessageContainerImplTest {
@@ -53,9 +56,11 @@ public class RawBatchMessageContainerImplTest {
CryptoKeyReader cryptoKeyReader;
Map<String, EncryptionContext.EncryptionKey> encryptKeys;
+ int maxBytesInBatch = 5 * 1024 * 1024;
+
public void setEncryptionAndCompression(boolean encrypt, boolean compress)
{
if (compress) {
- compressionType = LZ4;
+ compressionType = ZSTD;
} else {
compressionType = NONE;
}
@@ -100,14 +105,24 @@ public class RawBatchMessageContainerImplTest {
@BeforeMethod
public void setup() throws Exception {
- setEncryptionAndCompression(false, false);
+ setEncryptionAndCompression(false, true);
}
- @Test
- public void testToByteBuf() throws IOException {
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(2);
+ @DataProvider(name = "testBatchLimitByMessageCount")
+ public static Object[][] testBatchLimitByMessageCount() {
+ return new Object[][] {{true}, {false}};
+ }
+
+ @Test(timeOut = 20000, dataProvider = "testBatchLimitByMessageCount")
+ public void testToByteBufWithBatchLimit(boolean
testBatchLimitByMessageCount) throws IOException {
+ RawBatchMessageContainerImpl container = testBatchLimitByMessageCount ?
+ new RawBatchMessageContainerImpl(2, Integer.MAX_VALUE) :
+ new RawBatchMessageContainerImpl(Integer.MAX_VALUE, 5);
+
String topic = "my-topic";
- container.add(createMessage(topic, "hi-1", 0), null);
- container.add(createMessage(topic, "hi-2", 1), null);
+ var full1 = container.add(createMessage(topic, "hi-1", 0), null);
+ var full2 = container.add(createMessage(topic, "hi-2", 1), null);
+ assertFalse(full1);
+ assertTrue(full2);
ByteBuf buf = container.toByteBuf();
@@ -126,18 +141,23 @@ public class RawBatchMessageContainerImplTest {
MessageMetadata metadata =
singleMessageMetadataAndPayload.getMessageBuilder();
Assert.assertEquals(metadata.getNumMessagesInBatch(), 2);
Assert.assertEquals(metadata.getHighestSequenceId(), 1);
- Assert.assertEquals(metadata.getCompression(), NONE);
+ Assert.assertEquals(metadata.getCompression(), ZSTD);
+
+ CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
+ ByteBuf payload = codec.decode(metadataAndPayload,
metadata.getUncompressedSize());
SingleMessageMetadata messageMetadata = new SingleMessageMetadata();
+ messageMetadata.setCompactedOut(true);
ByteBuf payload1 = Commands.deSerializeSingleMessageInBatch(
- singleMessageMetadataAndPayload.getPayload(), messageMetadata,
0, 2);
+ payload, messageMetadata, 0, 2);
ByteBuf payload2 = Commands.deSerializeSingleMessageInBatch(
- singleMessageMetadataAndPayload.getPayload(), messageMetadata,
1, 2);
+ payload, messageMetadata, 1, 2);
Assert.assertEquals(payload1.toString(Charset.defaultCharset()),
"hi-1");
Assert.assertEquals(payload2.toString(Charset.defaultCharset()),
"hi-2");
payload1.release();
payload2.release();
+ payload.release();
singleMessageMetadataAndPayload.release();
metadataAndPayload.release();
buf.release();
@@ -147,7 +167,7 @@ public class RawBatchMessageContainerImplTest {
public void testToByteBufWithCompressionAndEncryption() throws IOException
{
setEncryptionAndCompression(true, true);
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(2);
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(2, maxBytesInBatch);
container.setCryptoKeyReader(cryptoKeyReader);
String topic = "my-topic";
container.add(createMessage(topic, "hi-1", 0), null);
@@ -169,7 +189,7 @@ public class RawBatchMessageContainerImplTest {
MessageMetadata metadata =
singleMessageMetadataAndPayload.getMessageBuilder();
Assert.assertEquals(metadata.getNumMessagesInBatch(), 2);
Assert.assertEquals(metadata.getHighestSequenceId(), 1);
- Assert.assertEquals(metadata.getCompression(), compressionType);
+ Assert.assertEquals(metadata.getCompression(), ZSTD);
ByteBuf payload = singleMessageMetadataAndPayload.getPayload();
int maxDecryptedSize =
msgCrypto.getMaxOutputSize(payload.readableBytes());
@@ -197,7 +217,7 @@ public class RawBatchMessageContainerImplTest {
@Test
public void testToByteBufWithSingleMessage() throws IOException {
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(2);
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(2, maxBytesInBatch);
String topic = "my-topic";
container.add(createMessage(topic, "hi-1", 0), null);
ByteBuf buf = container.toByteBuf();
@@ -218,9 +238,12 @@ public class RawBatchMessageContainerImplTest {
MessageMetadata metadata =
singleMessageMetadataAndPayload.getMessageBuilder();
Assert.assertEquals(metadata.getNumMessagesInBatch(), 1);
Assert.assertEquals(metadata.getHighestSequenceId(), 0);
- Assert.assertEquals(metadata.getCompression(), NONE);
+ Assert.assertEquals(metadata.getCompression(), ZSTD);
+
+ CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
+ ByteBuf payload = codec.decode(metadataAndPayload,
metadata.getUncompressedSize());
-
Assert.assertEquals(singleMessageMetadataAndPayload.getPayload().toString(Charset.defaultCharset()),
"hi-1");
+ Assert.assertEquals(payload.toString(Charset.defaultCharset()),
"hi-1");
singleMessageMetadataAndPayload.release();
metadataAndPayload.release();
buf.release();
@@ -228,7 +251,7 @@ public class RawBatchMessageContainerImplTest {
@Test
public void testMaxNumMessagesInBatch() {
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1);
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1, maxBytesInBatch);
String topic = "my-topic";
boolean isFull = container.add(createMessage(topic, "hi", 0), null);
@@ -238,14 +261,14 @@ public class RawBatchMessageContainerImplTest {
@Test(expectedExceptions = UnsupportedOperationException.class)
public void testCreateOpSendMsg() {
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1);
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1, maxBytesInBatch);
container.createOpSendMsg();
}
@Test
public void testToByteBufWithEncryptionWithoutCryptoKeyReader() {
setEncryptionAndCompression(true, false);
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1);
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1, maxBytesInBatch);
String topic = "my-topic";
container.add(createMessage(topic, "hi-1", 0), null);
Assert.assertEquals(container.getNumMessagesInBatch(), 1);
@@ -263,7 +286,7 @@ public class RawBatchMessageContainerImplTest {
@Test
public void testToByteBufWithEncryptionWithInvalidEncryptKeys() {
setEncryptionAndCompression(true, false);
- RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1);
+ RawBatchMessageContainerImpl container = new
RawBatchMessageContainerImpl(1, maxBytesInBatch);
container.setCryptoKeyReader(cryptoKeyReader);
encryptKeys = new HashMap<>();
encryptKeys.put(null, null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
index 0be9fa40754..317b1a227e5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -263,6 +264,48 @@ public class GetLastMessageIdCompactedTest extends
ProducerConsumerBase {
admin.topics().delete(topicName, false);
}
+ @Test(dataProvider = "enabledBatch")
+ public void testGetLastMessageIdAfterCompactionWithCompression(boolean
enabledBatch) throws Exception {
+ String topicName = "persistent://public/default/" +
BrokerTestUtil.newUniqueName("tp");
+ String subName = "sub";
+ Consumer<String> consumer = createConsumer(topicName, subName);
+ var producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .batchingMaxPublishDelay(3, TimeUnit.HOURS)
+ .batchingMaxBytes(Integer.MAX_VALUE)
+ .compressionType(CompressionType.ZSTD)
+ .enableBatching(enabledBatch).create();
+
+ List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+
sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+
sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+
sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
+ producer.flush();
+
sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+
sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+
sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
+ producer.flush();
+ FutureUtil.waitForAll(sendFutures).join();
+
+ triggerCompactionAndWait(topicName);
+
+ MessageIdImpl lastMessageIdByTopic =
getLastMessageIdByTopic(topicName);
+ MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId();
+ assertEquals(messageId.getLedgerId(),
lastMessageIdByTopic.getLedgerId());
+ assertEquals(messageId.getEntryId(),
lastMessageIdByTopic.getEntryId());
+ if (enabledBatch){
+ BatchMessageIdImpl lastBatchMessageIdByTopic =
(BatchMessageIdImpl) lastMessageIdByTopic;
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+ assertEquals(batchMessageId.getBatchSize(),
lastBatchMessageIdByTopic.getBatchSize());
+ assertEquals(batchMessageId.getBatchIndex(),
lastBatchMessageIdByTopic.getBatchIndex());
+ }
+
+ // cleanup.
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
+
@Test(dataProvider = "enabledBatch")
public void testGetLastMessageIdAfterCompactionEndWithNullMsg(boolean
enabledBatch) throws Exception {
String topicName = "persistent://public/default/" +
BrokerTestUtil.newUniqueName("tp");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
index 02812898dc4..1a69a86f7c6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
@@ -25,6 +25,7 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition;
+import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE;
import static
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -185,6 +186,7 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
.topic(topic)
+ .compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
@@ -352,12 +354,13 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
compactor.compact(topic, strategy).get();
// consumer with readCompacted enabled only get compacted entries
- var tableview = pulsar.getClient().newTableViewBuilder(schema)
+ var tableview = pulsar.getClient().newTableView(schema)
.topic(topic)
.loadConf(Map.of(
"topicCompactionStrategyClassName",
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
+
for(var etr : tableview.entrySet()){
Assert.assertEquals(expected.remove(etr.getKey()), etr.getValue());
if (expected.isEmpty()) {
@@ -376,6 +379,7 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
.topic(topic)
+ .compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.create();
@@ -420,6 +424,7 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
.topic(topic)
+ .compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.create();
@@ -460,6 +465,7 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
.topic(topic)
+ .compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.create();
@@ -557,6 +563,7 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
.topic(topic)
+ .compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
@@ -627,6 +634,7 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
.topic(topic)
+ .compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.create();
String key = "key0";
@@ -672,6 +680,7 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema)
.topic(topic)
+ .compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.create();
@@ -733,7 +742,9 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
public void testCompactionWithLastDeletedKey() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
- Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema).topic(topic).enableBatching(true)
+ Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema).topic(topic)
+ .compressionType(MSG_COMPRESSION_TYPE)
+ .enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
@@ -761,7 +772,9 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
public void testEmptyCompactionLedger() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
- Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema).topic(topic).enableBatching(true)
+ Producer<ServiceUnitStateData> producer =
pulsarClient.newProducer(schema).topic(topic)
+ .compressionType(MSG_COMPRESSION_TYPE)
+ .enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
@@ -791,7 +804,8 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
final int messages = 10;
// 1.create producer and publish message to the topic.
- ProducerBuilder<ServiceUnitStateData> builder =
pulsarClient.newProducer(schema).topic(topic);
+ ProducerBuilder<ServiceUnitStateData> builder =
pulsarClient.newProducer(schema)
+ .compressionType(MSG_COMPRESSION_TYPE).topic(topic);
builder.batchingMaxMessages(messages / 5);
Producer<ServiceUnitStateData> producer = builder.create();
@@ -828,6 +842,7 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
// 1.create producer and publish message to the topic.
ProducerBuilder<ServiceUnitStateData> builder =
pulsarClient.newProducer(schema).topic(topic);
+ builder.compressionType(MSG_COMPRESSION_TYPE);
builder.enableBatching(true);
@@ -876,6 +891,7 @@ public class ServiceUnitStateCompactionTest extends
MockedPulsarServiceBaseTest
// 1.create producer and publish message to the topic.
ProducerBuilder<ServiceUnitStateData> builder =
pulsarClient.newProducer(schema).topic(topic);
+ builder.compressionType(MSG_COMPRESSION_TYPE);
builder.batchingMaxMessages(messages / 5);
Producer<ServiceUnitStateData> producer = builder.create();