This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c8d2bba651b [improve][broker] PIP-192 Fix getLastMessageId for 
compressed payload(And add compression and maxBatchSize for the load balance 
system topic) (#20087)
c8d2bba651b is described below

commit c8d2bba651b45b9345ba3f3c4e5bcfe2468a2206
Author: Heesung Sohn <[email protected]>
AuthorDate: Thu Apr 13 10:56:22 2023 -0700

    [improve][broker] PIP-192 Fix getLastMessageId for compressed payload(And 
add compression and maxBatchSize for the load balance system topic) (#20087)
---
 .../channel/ServiceUnitStateChannelImpl.java       |  4 ++
 .../pulsar/broker/service/BrokerService.java       |  6 +--
 .../apache/pulsar/broker/service/ServerCnx.java    | 11 ++++
 .../client/impl/RawBatchMessageContainerImpl.java  | 12 ++---
 .../compaction/StrategicTwoPhaseCompactor.java     | 17 +++++-
 .../impl/RawBatchMessageContainerImplTest.java     | 63 +++++++++++++++-------
 .../compaction/GetLastMessageIdCompactedTest.java  | 43 +++++++++++++++
 .../compaction/ServiceUnitStateCompactionTest.java | 24 +++++++--
 8 files changed, 145 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index bd62c53be60..68c6440e68e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -81,6 +81,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -106,6 +107,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             TopicDomain.persistent.value(),
             SYSTEM_NAMESPACE,
             "loadbalancer-service-unit-state").toString();
+
+    public static final CompressionType MSG_COMPRESSION_TYPE = 
CompressionType.ZSTD;
     private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30 
* 1000; // 30sec
     public static final long VERSION_ID_INIT = 1; // initial versionId
     private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60;
@@ -285,6 +288,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
             producer = pulsar.getClient().newProducer(schema)
                     .enableBatching(true)
+                    .compressionType(MSG_COMPRESSION_TYPE)
                     .maxPendingMessages(MAX_OUTSTANDING_PUB_MESSAGES)
                     .blockIfQueueFull(true)
                     .topic(TOPIC)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bb08734298f..33e5500d623 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2144,9 +2144,9 @@ public class BrokerService implements Closeable {
                     if (ownedByThisInstance) {
                         return CompletableFuture.completedFuture(null);
                     } else {
-                        String msg = String.format("Namespace bundle for topic 
(%s) not served by this instance. "
-                                        + "Please redo the lookup. Request is 
denied: namespace=%s", topic,
-                                topicName.getNamespace());
+                        String msg = String.format("Namespace bundle for topic 
(%s) not served by this instance:%s. "
+                                        + "Please redo the lookup. Request is 
denied: namespace=%s",
+                                topic, pulsar.getLookupServiceAddress(), 
topicName.getNamespace());
                         log.warn(msg);
                         return FutureUtil.failedFuture(new 
ServiceUnitNotReadyException(msg));
                     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 01274ab4460..888668e15b1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -129,6 +129,7 @@ import 
org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
 import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
+import org.apache.pulsar.common.api.proto.CompressionType;
 import org.apache.pulsar.common.api.proto.FeatureFlags;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
@@ -141,6 +142,8 @@ import org.apache.pulsar.common.api.proto.Schema;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.intercept.InterceptException;
 import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -2167,6 +2170,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         if (batchSize <= 1){
             return -1;
         }
+        if (metadata.hasCompression()) {
+            var tmp = payload;
+            CompressionType compressionType = metadata.getCompression();
+            CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
+            int uncompressedSize = metadata.getUncompressedSize();
+            payload = codec.decode(payload, uncompressedSize);
+            tmp.release();
+        }
         SingleMessageMetadata singleMessageMetadata = new 
SingleMessageMetadata();
         int lastBatchIndexInBatch = -1;
         for (int i = 0; i < batchSize; i++){
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
index cf6b213155c..7e1c2cd5e3f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
@@ -47,13 +47,13 @@ public class RawBatchMessageContainerImpl extends 
BatchMessageContainerImpl {
     MessageCrypto msgCrypto;
     Set<String> encryptionKeys;
     CryptoKeyReader cryptoKeyReader;
-    public RawBatchMessageContainerImpl(int maxNumMessagesInBatch) {
+
+    public RawBatchMessageContainerImpl(int maxNumMessagesInBatch, int 
maxBytesInBatch) {
         super();
-        compressionType = CompressionType.NONE;
-        compressor = new CompressionCodecNone();
-        if (maxNumMessagesInBatch > 0) {
-            this.maxNumMessagesInBatch = maxNumMessagesInBatch;
-        }
+        this.compressionType = CompressionType.NONE;
+        this.compressor = new CompressionCodecNone();
+        this.maxNumMessagesInBatch = maxNumMessagesInBatch;
+        this.maxBytesInBatch = maxBytesInBatch;
     }
     private ByteBuf encrypt(ByteBuf compressedPayload) {
         if (msgCrypto == null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
index 37b03e275d6..557d4a65801 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.compaction;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import java.time.Duration;
 import java.util.Iterator;
@@ -62,17 +63,29 @@ import org.slf4j.LoggerFactory;
 public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
     private static final Logger log = 
LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
     private static final int MAX_OUTSTANDING = 500;
+    private static final int MAX_NUM_MESSAGES_IN_BATCH = 1000;
+    private static final int MAX_BYTES_IN_BATCH = 128 * 1024;
     private static final int MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS = 20 
* 1000;
     private final Duration phaseOneLoopReadTimeout;
     private final RawBatchMessageContainerImpl batchMessageContainer;
 
+    @VisibleForTesting
     public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
                                       PulsarClient pulsar,
                                       BookKeeper bk,
                                       ScheduledExecutorService scheduler,
                                       int maxNumMessagesInBatch) {
+        this(conf, pulsar, bk, scheduler, maxNumMessagesInBatch, 
MAX_BYTES_IN_BATCH);
+    }
+
+    private StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+                                      PulsarClient pulsar,
+                                      BookKeeper bk,
+                                      ScheduledExecutorService scheduler,
+                                      int maxNumMessagesInBatch,
+                                      int maxBytesInBatch) {
         super(conf, pulsar, bk, scheduler);
-        batchMessageContainer = new 
RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+        batchMessageContainer = new 
RawBatchMessageContainerImpl(maxNumMessagesInBatch, maxBytesInBatch);
         phaseOneLoopReadTimeout = 
Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
     }
 
@@ -80,7 +93,7 @@ public class StrategicTwoPhaseCompactor extends 
TwoPhaseCompactor {
                                       PulsarClient pulsar,
                                       BookKeeper bk,
                                       ScheduledExecutorService scheduler) {
-        this(conf, pulsar, bk, scheduler, -1);
+        this(conf, pulsar, bk, scheduler, MAX_NUM_MESSAGES_IN_BATCH, 
MAX_BYTES_IN_BATCH);
     }
 
     public CompletableFuture<Long> compact(String topic) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
index 9fa834a166c..9b8b1e5efb9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.client.impl;
 
 
-import static org.apache.pulsar.common.api.proto.CompressionType.LZ4;
 import static org.apache.pulsar.common.api.proto.CompressionType.NONE;
+import static org.apache.pulsar.common.api.proto.CompressionType.ZSTD;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.IOException;
@@ -45,6 +47,7 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.compaction.CompactionTest;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class RawBatchMessageContainerImplTest {
@@ -53,9 +56,11 @@ public class RawBatchMessageContainerImplTest {
     CryptoKeyReader cryptoKeyReader;
     Map<String, EncryptionContext.EncryptionKey> encryptKeys;
 
+    int maxBytesInBatch = 5 * 1024 * 1024;
+
     public void setEncryptionAndCompression(boolean encrypt, boolean compress) 
{
         if (compress) {
-            compressionType = LZ4;
+            compressionType = ZSTD;
         } else {
             compressionType = NONE;
         }
@@ -100,14 +105,24 @@ public class RawBatchMessageContainerImplTest {
 
     @BeforeMethod
     public void setup() throws Exception {
-        setEncryptionAndCompression(false, false);
+        setEncryptionAndCompression(false, true);
     }
-    @Test
-    public void testToByteBuf() throws IOException {
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(2);
+    @DataProvider(name = "testBatchLimitByMessageCount")
+    public static Object[][] testBatchLimitByMessageCount() {
+        return new Object[][] {{true}, {false}};
+    }
+
+    @Test(timeOut = 20000, dataProvider = "testBatchLimitByMessageCount")
+    public void testToByteBufWithBatchLimit(boolean 
testBatchLimitByMessageCount) throws IOException {
+        RawBatchMessageContainerImpl container = testBatchLimitByMessageCount ?
+                new RawBatchMessageContainerImpl(2, Integer.MAX_VALUE) :
+                new RawBatchMessageContainerImpl(Integer.MAX_VALUE, 5);
+
         String topic = "my-topic";
-        container.add(createMessage(topic, "hi-1", 0), null);
-        container.add(createMessage(topic, "hi-2", 1), null);
+        var full1 = container.add(createMessage(topic, "hi-1", 0), null);
+        var full2 = container.add(createMessage(topic, "hi-2", 1), null);
+        assertFalse(full1);
+        assertTrue(full2);
         ByteBuf buf = container.toByteBuf();
 
 
@@ -126,18 +141,23 @@ public class RawBatchMessageContainerImplTest {
         MessageMetadata metadata = 
singleMessageMetadataAndPayload.getMessageBuilder();
         Assert.assertEquals(metadata.getNumMessagesInBatch(), 2);
         Assert.assertEquals(metadata.getHighestSequenceId(), 1);
-        Assert.assertEquals(metadata.getCompression(), NONE);
+        Assert.assertEquals(metadata.getCompression(), ZSTD);
+
+        CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
+        ByteBuf payload = codec.decode(metadataAndPayload, 
metadata.getUncompressedSize());
 
         SingleMessageMetadata messageMetadata = new SingleMessageMetadata();
+        messageMetadata.setCompactedOut(true);
         ByteBuf payload1 = Commands.deSerializeSingleMessageInBatch(
-                singleMessageMetadataAndPayload.getPayload(), messageMetadata, 
0, 2);
+                payload, messageMetadata, 0, 2);
         ByteBuf payload2 = Commands.deSerializeSingleMessageInBatch(
-                singleMessageMetadataAndPayload.getPayload(), messageMetadata, 
1, 2);
+                payload, messageMetadata, 1, 2);
 
         Assert.assertEquals(payload1.toString(Charset.defaultCharset()), 
"hi-1");
         Assert.assertEquals(payload2.toString(Charset.defaultCharset()), 
"hi-2");
         payload1.release();
         payload2.release();
+        payload.release();
         singleMessageMetadataAndPayload.release();
         metadataAndPayload.release();
         buf.release();
@@ -147,7 +167,7 @@ public class RawBatchMessageContainerImplTest {
     public void testToByteBufWithCompressionAndEncryption() throws IOException 
{
         setEncryptionAndCompression(true, true);
 
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(2);
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(2, maxBytesInBatch);
         container.setCryptoKeyReader(cryptoKeyReader);
         String topic = "my-topic";
         container.add(createMessage(topic, "hi-1", 0), null);
@@ -169,7 +189,7 @@ public class RawBatchMessageContainerImplTest {
         MessageMetadata metadata = 
singleMessageMetadataAndPayload.getMessageBuilder();
         Assert.assertEquals(metadata.getNumMessagesInBatch(), 2);
         Assert.assertEquals(metadata.getHighestSequenceId(), 1);
-        Assert.assertEquals(metadata.getCompression(), compressionType);
+        Assert.assertEquals(metadata.getCompression(), ZSTD);
 
         ByteBuf payload = singleMessageMetadataAndPayload.getPayload();
         int maxDecryptedSize = 
msgCrypto.getMaxOutputSize(payload.readableBytes());
@@ -197,7 +217,7 @@ public class RawBatchMessageContainerImplTest {
 
     @Test
     public void testToByteBufWithSingleMessage() throws IOException {
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(2);
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(2, maxBytesInBatch);
         String topic = "my-topic";
         container.add(createMessage(topic, "hi-1", 0), null);
         ByteBuf buf = container.toByteBuf();
@@ -218,9 +238,12 @@ public class RawBatchMessageContainerImplTest {
         MessageMetadata metadata = 
singleMessageMetadataAndPayload.getMessageBuilder();
         Assert.assertEquals(metadata.getNumMessagesInBatch(), 1);
         Assert.assertEquals(metadata.getHighestSequenceId(), 0);
-        Assert.assertEquals(metadata.getCompression(), NONE);
+        Assert.assertEquals(metadata.getCompression(), ZSTD);
+
+        CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
+        ByteBuf payload = codec.decode(metadataAndPayload, 
metadata.getUncompressedSize());
 
-        
Assert.assertEquals(singleMessageMetadataAndPayload.getPayload().toString(Charset.defaultCharset()),
 "hi-1");
+        Assert.assertEquals(payload.toString(Charset.defaultCharset()), 
"hi-1");
         singleMessageMetadataAndPayload.release();
         metadataAndPayload.release();
         buf.release();
@@ -228,7 +251,7 @@ public class RawBatchMessageContainerImplTest {
 
     @Test
     public void testMaxNumMessagesInBatch() {
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1);
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1, maxBytesInBatch);
         String topic = "my-topic";
 
         boolean isFull = container.add(createMessage(topic, "hi", 0), null);
@@ -238,14 +261,14 @@ public class RawBatchMessageContainerImplTest {
 
     @Test(expectedExceptions = UnsupportedOperationException.class)
     public void testCreateOpSendMsg() {
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1);
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1, maxBytesInBatch);
         container.createOpSendMsg();
     }
 
     @Test
     public void testToByteBufWithEncryptionWithoutCryptoKeyReader() {
         setEncryptionAndCompression(true, false);
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1);
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1, maxBytesInBatch);
         String topic = "my-topic";
         container.add(createMessage(topic, "hi-1", 0), null);
         Assert.assertEquals(container.getNumMessagesInBatch(), 1);
@@ -263,7 +286,7 @@ public class RawBatchMessageContainerImplTest {
     @Test
     public void testToByteBufWithEncryptionWithInvalidEncryptKeys() {
         setEncryptionAndCompression(true, false);
-        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1);
+        RawBatchMessageContainerImpl container = new 
RawBatchMessageContainerImpl(1, maxBytesInBatch);
         container.setCryptoKeyReader(cryptoKeyReader);
         encryptKeys = new HashMap<>();
         encryptKeys.put(null, null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
index 0be9fa40754..317b1a227e5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -263,6 +264,48 @@ public class GetLastMessageIdCompactedTest extends 
ProducerConsumerBase {
         admin.topics().delete(topicName, false);
     }
 
+    @Test(dataProvider = "enabledBatch")
+    public void testGetLastMessageIdAfterCompactionWithCompression(boolean 
enabledBatch) throws Exception {
+        String topicName = "persistent://public/default/" + 
BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Consumer<String> consumer = createConsumer(topicName, subName);
+        var producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .batchingMaxPublishDelay(3, TimeUnit.HOURS)
+                .batchingMaxBytes(Integer.MAX_VALUE)
+                .compressionType(CompressionType.ZSTD)
+                .enableBatching(enabledBatch).create();
+
+        List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        
sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync());
+        
sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync());
+        
sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync());
+        producer.flush();
+        
sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync());
+        
sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync());
+        
sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync());
+        producer.flush();
+        FutureUtil.waitForAll(sendFutures).join();
+
+        triggerCompactionAndWait(topicName);
+
+        MessageIdImpl lastMessageIdByTopic = 
getLastMessageIdByTopic(topicName);
+        MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId();
+        assertEquals(messageId.getLedgerId(), 
lastMessageIdByTopic.getLedgerId());
+        assertEquals(messageId.getEntryId(), 
lastMessageIdByTopic.getEntryId());
+        if (enabledBatch){
+            BatchMessageIdImpl lastBatchMessageIdByTopic = 
(BatchMessageIdImpl) lastMessageIdByTopic;
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            assertEquals(batchMessageId.getBatchSize(), 
lastBatchMessageIdByTopic.getBatchSize());
+            assertEquals(batchMessageId.getBatchIndex(), 
lastBatchMessageIdByTopic.getBatchIndex());
+        }
+
+        // cleanup.
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+    }
+
     @Test(dataProvider = "enabledBatch")
     public void testGetLastMessageIdAfterCompactionEndWithNullMsg(boolean 
enabledBatch) throws Exception {
         String topicName = "persistent://public/default/" + 
BrokerTestUtil.newUniqueName("tp");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
index 02812898dc4..1a69a86f7c6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java
@@ -25,6 +25,7 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Releasing;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
@@ -185,6 +186,7 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
 
         Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
                 .topic(topic)
+                .compressionType(MSG_COMPRESSION_TYPE)
                 .enableBatching(true)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
@@ -352,12 +354,13 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
         compactor.compact(topic, strategy).get();
 
         // consumer with readCompacted enabled only get compacted entries
-        var tableview = pulsar.getClient().newTableViewBuilder(schema)
+        var tableview = pulsar.getClient().newTableView(schema)
                 .topic(topic)
                 .loadConf(Map.of(
                         "topicCompactionStrategyClassName",
                         ServiceUnitStateCompactionStrategy.class.getName()))
                 .create();
+
         for(var etr : tableview.entrySet()){
             Assert.assertEquals(expected.remove(etr.getKey()), etr.getValue());
             if (expected.isEmpty()) {
@@ -376,6 +379,7 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
 
         Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
                 .topic(topic)
+                .compressionType(MSG_COMPRESSION_TYPE)
                 .enableBatching(true)
                 .create();
 
@@ -420,6 +424,7 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
 
         Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
                 .topic(topic)
+                .compressionType(MSG_COMPRESSION_TYPE)
                 .enableBatching(true)
                 .create();
 
@@ -460,6 +465,7 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
 
         Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
                 .topic(topic)
+                .compressionType(MSG_COMPRESSION_TYPE)
                 .enableBatching(true)
                 .create();
 
@@ -557,6 +563,7 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
 
         Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
                 .topic(topic)
+                .compressionType(MSG_COMPRESSION_TYPE)
                 .enableBatching(true)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
@@ -627,6 +634,7 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
 
         Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
                 .topic(topic)
+                .compressionType(MSG_COMPRESSION_TYPE)
                 .enableBatching(true)
                 .create();
         String key = "key0";
@@ -672,6 +680,7 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
 
         Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema)
                 .topic(topic)
+                .compressionType(MSG_COMPRESSION_TYPE)
                 .enableBatching(true)
                 .create();
 
@@ -733,7 +742,9 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
     public void testCompactionWithLastDeletedKey() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema).topic(topic).enableBatching(true)
+        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema).topic(topic)
+                .compressionType(MSG_COMPRESSION_TYPE)
+                .enableBatching(true)
                 
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
 
         
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
@@ -761,7 +772,9 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
     public void testEmptyCompactionLedger() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";
 
-        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema).topic(topic).enableBatching(true)
+        Producer<ServiceUnitStateData> producer = 
pulsarClient.newProducer(schema).topic(topic)
+                .compressionType(MSG_COMPRESSION_TYPE)
+                .enableBatching(true)
                 
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
 
         
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
@@ -791,7 +804,8 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
         final int messages = 10;
 
         // 1.create producer and publish message to the topic.
-        ProducerBuilder<ServiceUnitStateData> builder = 
pulsarClient.newProducer(schema).topic(topic);
+        ProducerBuilder<ServiceUnitStateData> builder = 
pulsarClient.newProducer(schema)
+                .compressionType(MSG_COMPRESSION_TYPE).topic(topic);
         builder.batchingMaxMessages(messages / 5);
 
         Producer<ServiceUnitStateData> producer = builder.create();
@@ -828,6 +842,7 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
 
         // 1.create producer and publish message to the topic.
         ProducerBuilder<ServiceUnitStateData> builder = 
pulsarClient.newProducer(schema).topic(topic);
+        builder.compressionType(MSG_COMPRESSION_TYPE);
         builder.enableBatching(true);
 
 
@@ -876,6 +891,7 @@ public class ServiceUnitStateCompactionTest extends 
MockedPulsarServiceBaseTest
 
         // 1.create producer and publish message to the topic.
         ProducerBuilder<ServiceUnitStateData> builder = 
pulsarClient.newProducer(schema).topic(topic);
+        builder.compressionType(MSG_COMPRESSION_TYPE);
         builder.batchingMaxMessages(messages / 5);
 
         Producer<ServiceUnitStateData> producer = builder.create();

Reply via email to