This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9c39726219aeb48b68f637a00e10b1208223a099
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Jul 13 16:56:01 2021 +0200

    Fixed retention of keys in compaction (#11287)
    
    This change fixes few issues in the compaction mechanism, the
    
     * When a reader is created, reading from "earliest" message, it should 
read the compacted data and then continue from the next message.
     * When the compaction consumer starts, it shouldn't seek to the beginning. 
This causes 2 issues:
       * Rescanning of the topic each time the compaction runs
       * Keys that are being dropped from the topic are also getting dropped 
from the compacted view, while in fact they should be there until explicitly 
deleted (with an empty message for a key).
    
    The main source of the problem is that when creating a cursor on "earliest" 
message, the cursor gets automatically adjusted on the earliest message 
available to read. This confuses the check for the read-compacted because it 
may think the reader/consumer is already ahead of the compaction horizon.
    
    Introduced a "isFirstRead" flag to make sure we double check the start 
message id and use `MessageId.earliest` instead of the earliest available 
message to read on the topic. After the first read, the positioning will be 
fine.
    
    (cherry picked from commit feb4ff19e097a9d8f13b093e8fb25dc12c31227b)
---
 .../AbstractDispatcherSingleActiveConsumer.java    |   6 +
 .../org/apache/pulsar/broker/service/Consumer.java |  12 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   2 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   4 +-
 ...entStreamingDispatcherSingleActiveConsumer.java |   5 +-
 .../broker/service/persistent/PersistentTopic.java |   3 +-
 .../org/apache/pulsar/client/api/RawReader.java    |   2 +-
 .../apache/pulsar/compaction/CompactedTopic.java   |   8 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  29 ++-
 .../PersistentDispatcherFailoverConsumerTest.java  |  19 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  43 ++--
 .../pulsar/compaction/CompactionRetentionTest.java | 229 +++++++++++++++++++++
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    |  28 +--
 13 files changed, 328 insertions(+), 62 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index e73daaa..690a598 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -55,6 +55,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer 
extends AbstractBas
             
AtomicIntegerFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class,
 "isClosed");
     private volatile int isClosed = FALSE;
 
+    protected boolean isFirstRead = true;
+
     public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, 
int partitionIndex,
                                                   String topicName, 
Subscription subscription,
                                                   ServiceConfiguration 
serviceConfig) {
@@ -159,6 +161,10 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
             isKeyHashRangeFiltered = false;
         }
 
+        if (consumers.isEmpty()) {
+            isFirstRead = true;
+        }
+
         consumers.add(consumer);
 
         if (!pickAndScheduleActiveConsumer()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index f093cd6..4cfc089 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -41,6 +41,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -123,12 +124,13 @@ public class Consumer {
     private boolean preciseDispatcherFlowControl;
     private PositionImpl readPositionWhenJoining;
     private final String clientAddress; // IP address only, no port number 
included
+    private final MessageId startMessageId;
 
     public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     int maxUnackedMessages, TransportCnx cnx, String appId,
                     Map<String, String> metadata, boolean readCompacted, 
InitialPosition subscriptionInitialPosition,
-                    KeySharedMeta keySharedMeta) {
+                    KeySharedMeta keySharedMeta, MessageId startMessageId) {
 
         this.subscription = subscription;
         this.subType = subType;
@@ -148,6 +150,10 @@ public class Consumer {
         this.bytesOutCounter = new LongAdder();
         this.msgOutCounter = new LongAdder();
         this.appId = appId;
+
+        // Ensure we start from compacted view
+        this.startMessageId = (readCompacted && startMessageId == null) ? 
MessageId.earliest : startMessageId;
+
         this.preciseDispatcherFlowControl = 
cnx.isPreciseDispatcherFlowControl();
         PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
         MESSAGE_PERMITS_UPDATER.set(this, 0);
@@ -835,5 +841,9 @@ public class Consumer {
         return clientAddress;
     }
 
+    public MessageId getStartMessageId() {
+        return startMessageId;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Consumer.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 5807d00..0c7b965 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -272,7 +272,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
         NonPersistentSubscription subscription = 
subscriptions.computeIfAbsent(subscriptionName,
                 name -> new NonPersistentSubscription(this, subscriptionName));
         Consumer consumer = new Consumer(subscription, subType, topic, 
consumerId, priorityLevel, consumerName, 0,
-                cnx, cnx.getAuthRole(), metadata, readCompacted, 
initialPosition, keySharedMeta);
+                cnx, cnx.getAuthRole(), metadata, readCompacted, 
initialPosition, keySharedMeta, MessageId.latest);
         addConsumerToSubscription(subscription, consumer).thenRun(() -> {
             if (!cnx.isActive()) {
                 try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 2bd94ff..4bae5b0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -153,6 +153,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         }
 
         havePendingRead = false;
+        isFirstRead = false;
 
         if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
             int newReadBatchSize = Math.min(readBatchSize * 2, 
serviceConfig.getDispatcherMaxReadBatchSize());
@@ -338,7 +339,8 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
             }
             havePendingRead = true;
             if (consumer.readCompacted()) {
-                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, this, consumer);
+                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
+                        this, consumer);
             } else {
                 cursor.asyncReadEntriesOrWait(messagesToRead,
                         bytesToRead, this, consumer, 
topic.getMaxReadPosition());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index 82e8d6d..f19031b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -121,6 +121,8 @@ public class 
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
             havePendingRead = false;
         }
 
+        isFirstRead = false;
+
         if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
             int newReadBatchSize = Math.min(readBatchSize * 2, 
serviceConfig.getDispatcherMaxReadBatchSize());
             if (log.isDebugEnabled()) {
@@ -197,7 +199,8 @@ public class 
PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
             havePendingRead = true;
 
             if (consumer.readCompacted()) {
-                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, this, consumer);
+                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
+                        this, consumer);
             } else {
                 streamingEntryReader.asyncReadEntries(messagesToRead, 
bytesToRead, consumer);
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ad781cd..013710e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -705,7 +705,6 @@ public class PersistentTopic extends AbstractTopic
                         new NotAllowedException("Subscribe limited by 
subscribe rate limit per consumer."));
                 return future;
             }
-
         }
 
         lock.readLock().lock();
@@ -733,7 +732,7 @@ public class PersistentTopic extends AbstractTopic
         subscriptionFuture.thenAccept(subscription -> {
             Consumer consumer = new Consumer(subscription, subType, topic, 
consumerId, priorityLevel, consumerName,
                     maxUnackedMessages, cnx, cnx.getAuthRole(), metadata,
-                    readCompacted, initialPosition, keySharedMeta);
+                    readCompacted, initialPosition, keySharedMeta, 
startMessageId);
             addConsumerToSubscription(subscription, consumer).thenAccept(v -> {
                 checkBackloggedCursors();
                 if (!cnx.isActive()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index 415c3dc..f74157a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -34,7 +34,7 @@ public interface RawReader {
     static CompletableFuture<RawReader> create(PulsarClient client, String 
topic, String subscription) {
         CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
         RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, 
subscription, future);
-        return future.thenCompose((consumer) -> 
r.seekAsync(MessageId.earliest)).thenApply((ignore) -> r);
+        return future.thenCompose(x -> 
x.seekAsync(MessageId.earliest)).thenApply(__ -> r);
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 88b8e58..4922852 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -22,9 +22,13 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.service.Consumer;
 
 public interface CompactedTopic {
     CompletableFuture<?> newCompactedLedger(Position p, long 
compactedLedgerId);
-    void asyncReadEntriesOrWait(ManagedCursor cursor, int 
numberOfEntriesToRead,
-                                ReadEntriesCallback callback, Object ctx);
+    void asyncReadEntriesOrWait(ManagedCursor cursor,
+                                int numberOfEntriesToRead,
+                                boolean isFirstRead,
+                                ReadEntriesCallback callback,
+                                Consumer consumer);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 12748e8..21e9a1d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -42,6 +42,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.api.proto.MessageIdData;
@@ -81,13 +83,20 @@ public class CompactedTopicImpl implements CompactedTopic {
     }
 
     @Override
-    public void asyncReadEntriesOrWait(ManagedCursor cursor, int 
numberOfEntriesToRead,
-                                       ReadEntriesCallback callback, Object 
ctx) {
+    public void asyncReadEntriesOrWait(ManagedCursor cursor,
+                                       int numberOfEntriesToRead,
+                                       boolean isFirstRead,
+                                       ReadEntriesCallback callback, Consumer 
consumer) {
         synchronized (this) {
-            PositionImpl cursorPosition = (PositionImpl) 
cursor.getReadPosition();
+            PositionImpl cursorPosition;
+            if (isFirstRead && 
MessageId.earliest.equals(consumer.getStartMessageId())){
+                cursorPosition = PositionImpl.earliest;
+            } else {
+                cursorPosition = (PositionImpl) cursor.getReadPosition();
+            }
             if (compactionHorizon == null
                 || compactionHorizon.compareTo(cursorPosition) < 0) {
-                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, 
ctx, PositionImpl.latest);
+                cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, 
consumer, PositionImpl.latest);
             } else {
                 compactedTopicContext.thenCompose(
                     (context) -> findStartPoint(cursorPosition, 
context.ledger.getLastAddConfirmed(), context.cache)
@@ -96,11 +105,11 @@ public class CompactedTopicImpl implements CompactedTopic {
                             // the cursor just needs to be set to the 
compaction horizon
                             if (startPoint == COMPACT_LEDGER_EMPTY) {
                                 cursor.seek(compactionHorizon.getNext());
-                                
callback.readEntriesComplete(Collections.emptyList(), ctx);
+                                
callback.readEntriesComplete(Collections.emptyList(), consumer);
                                 return CompletableFuture.completedFuture(null);
                             }
                             if (startPoint == NEWER_THAN_COMPACTED && 
compactionHorizon.compareTo(cursorPosition) < 0) {
-                                
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx,
+                                
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer,
                                         PositionImpl.latest);
                                 return CompletableFuture.completedFuture(null);
                             } else {
@@ -108,23 +117,23 @@ public class CompactedTopicImpl implements CompactedTopic 
{
                                                          startPoint + 
numberOfEntriesToRead);
                                 if (startPoint == NEWER_THAN_COMPACTED) {
                                     cursor.seek(compactionHorizon.getNext());
-                                    
callback.readEntriesComplete(Collections.emptyList(), ctx);
+                                    
callback.readEntriesComplete(Collections.emptyList(), consumer);
                                     return 
CompletableFuture.completedFuture(null);
                                 }
                                 return readEntries(context.ledger, startPoint, 
endPoint)
                                     .thenAccept((entries) -> {
                                         Entry lastEntry = 
entries.get(entries.size() - 1);
                                         
cursor.seek(lastEntry.getPosition().getNext());
-                                        callback.readEntriesComplete(entries, 
ctx);
+                                        callback.readEntriesComplete(entries, 
consumer);
                                     });
                             }
                         }))
                     .exceptionally((exception) -> {
                         if (exception.getCause() instanceof 
NoSuchElementException) {
                             cursor.seek(compactionHorizon.getNext());
-                            
callback.readEntriesComplete(Collections.emptyList(), ctx);
+                            
callback.readEntriesComplete(Collections.emptyList(), consumer);
                         } else {
-                            callback.readEntriesFailed(new 
ManagedLedgerException(exception), ctx);
+                            callback.readEntriesFailed(new 
ManagedLedgerException(exception), consumer);
                         }
                         return null;
                     });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 6cccfee..692ea22 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -73,6 +73,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleAct
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
@@ -299,7 +300,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 2. Add old consumer
         Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
-                "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, 
"myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null);
+                "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, 
"myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, 
MessageId.latest);
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -310,7 +311,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 3. Add new consumer
         Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0,
-                "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(), false, InitialPosition.Latest, null);
+                "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest);
         pdfc.addConsumer(consumer2);
         consumers = pdfc.getConsumers();
         assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -339,7 +340,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         // 2. Add consumer
         Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
                 "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null));
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest));
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -363,7 +364,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 5. Add another consumer which does not change active consumer
         Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
-                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null));
+                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest));
         pdfc.addConsumer(consumer2);
         consumers = pdfc.getConsumers();
         assertSame(pdfc.getActiveConsumer().consumerName(), 
consumer1.consumerName());
@@ -377,7 +378,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         // 6. Add a consumer which changes active consumer
         Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 0 /* consumer id */, 0,
                 "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null));
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest));
         pdfc.addConsumer(consumer0);
         consumers = pdfc.getConsumers();
         assertSame(pdfc.getActiveConsumer().consumerName(), 
consumer0.consumerName());
@@ -460,7 +461,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         // 2. Add a consumer
         Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, 
topic.getName(), 1 /* consumer id */, 1,
                 "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null));
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest));
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertEquals(1, consumers.size());
@@ -469,7 +470,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         // 3. Add a consumer with same priority level and consumer name is 
smaller in lexicographic order.
         Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, 
topic.getName(), 2 /* consumer id */, 1,
                 "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null));
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest));
         pdfc.addConsumer(consumer2);
 
         // 4. Verify active consumer doesn't change
@@ -482,7 +483,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 5. Add another consumer which has higher priority level
         Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, 
topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */,
-                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null));
+                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest));
         pdfc.addConsumer(consumer3);
         consumers = pdfc.getConsumers();
         assertEquals(3, consumers.size());
@@ -672,7 +673,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     private Consumer createConsumer(int priority, int permit, boolean blocked, 
int id) throws Exception {
         Consumer consumer =
                 new Consumer(null, SubType.Shared, "test-topic", id, priority, 
""+id, 5000,
-                        serverCnx, "appId", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null);
+                        serverCnx, "appId", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest);
         try {
             consumer.flowPermits(permit);
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 9de3277..4a804b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -100,6 +100,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleAct
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -715,7 +716,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         Consumer consumer = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1, 0, "Cons1", 50000, serverCnx,
                 "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest,
-                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
+                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest);
         sub.addConsumer(consumer);
         consumer.close();
 
@@ -726,7 +727,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
             consumer = new Consumer(sub, subType, topic.getName(), 1, 0, 
"Cons1", 50000, serverCnx, "myrole-1",
                     Collections.emptyMap(), false, InitialPosition.Latest,
-                    new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
+                    new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest);
             sub.addConsumer(consumer);
 
             assertTrue(sub.getDispatcher().isConsumerConnected());
@@ -749,7 +750,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         // 1. simple add consumer
         Consumer consumer = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
-                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null);
+                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest);
         sub.addConsumer(consumer);
         assertTrue(sub.getDispatcher().isConsumerConnected());
 
@@ -782,7 +783,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         PersistentSubscription sub = new PersistentSubscription(topic, 
"non-durable-sub", cursorMock, false);
 
         Consumer consumer = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1, 0, "Cons1", 50000, serverCnx,
-                "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest, null);
+                "myrole-1", Collections.emptyMap(), false, 
InitialPosition.Latest, null, MessageId.latest);
 
         sub.addConsumer(consumer);
         assertFalse(sub.getDispatcher().isClosed());
@@ -818,14 +819,14 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         // 1. add consumer1
         Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(), 
1 /* consumer id */, 0,
                 "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
         addConsumerToSubscription.invoke(topic, sub, consumer);
         assertEquals(sub.getConsumers().size(), 1);
 
         // 2. add consumer2
         Consumer consumer2 = new Consumer(sub, SubType.Shared, 
topic.getName(), 2 /* consumer id */, 0,
                 "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
         addConsumerToSubscription.invoke(topic, sub, consumer2);
         assertEquals(sub.getConsumers().size(), 2);
 
@@ -833,7 +834,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         try {
             Consumer consumer3 = new Consumer(sub, SubType.Shared, 
topic.getName(), 3 /* consumer id */, 0,
                     "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                    false /* read compacted */, InitialPosition.Latest, null);
+                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
             ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, 
sub, consumer3)).get();
             fail("should have failed");
         } catch (ExecutionException e) {
@@ -846,7 +847,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         // 4. add consumer4 to sub2
         Consumer consumer4 = new Consumer(sub2, SubType.Shared, 
topic.getName(), 4 /* consumer id */, 0,
                 "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
         addConsumerToSubscription.invoke(topic, sub2, consumer4);
         assertEquals(sub2.getConsumers().size(), 1);
 
@@ -857,7 +858,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         try {
             Consumer consumer5 = new Consumer(sub2, SubType.Shared, 
topic.getName(), 5 /* consumer id */, 0,
                     "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                    false /* read compacted */, InitialPosition.Latest, null);
+                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
             ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, 
sub2, consumer5)).get();
             fail("should have failed");
         } catch (ExecutionException e) {
@@ -913,14 +914,14 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         // 1. add consumer1
         Consumer consumer = new Consumer(sub, SubType.Failover, 
topic.getName(), 1 /* consumer id */, 0,
                 "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
         addConsumerToSubscription.invoke(topic, sub, consumer);
         assertEquals(sub.getConsumers().size(), 1);
 
         // 2. add consumer2
         Consumer consumer2 = new Consumer(sub, SubType.Failover, 
topic.getName(), 2 /* consumer id */, 0,
                 "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
         addConsumerToSubscription.invoke(topic, sub, consumer2);
         assertEquals(sub.getConsumers().size(), 2);
 
@@ -928,7 +929,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         try {
             Consumer consumer3 = new Consumer(sub, SubType.Failover, 
topic.getName(), 3 /* consumer id */, 0,
                     "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                    false /* read compacted */, InitialPosition.Latest, null);
+                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
             ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, 
sub, consumer3)).get();
             fail("should have failed");
         } catch (ExecutionException e) {
@@ -941,7 +942,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         // 4. add consumer4 to sub2
         Consumer consumer4 = new Consumer(sub2, SubType.Failover, 
topic.getName(), 4 /* consumer id */, 0,
                 "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */, InitialPosition.Latest, null);
+                false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
         addConsumerToSubscription.invoke(topic, sub2, consumer4);
         assertEquals(sub2.getConsumers().size(), 1);
 
@@ -952,7 +953,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         try {
             Consumer consumer5 = new Consumer(sub2, SubType.Failover, 
topic.getName(), 5 /* consumer id */, 0,
                     "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                    false /* read compacted */, InitialPosition.Latest, null);
+                    false /* read compacted */, InitialPosition.Latest, null, 
MessageId.latest);
             ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, 
sub2, consumer5)).get();
             fail("should have failed");
         } catch (ExecutionException e) {
@@ -1000,7 +1001,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(new PulsarCommandSenderImpl(null, 
cnx)).when(cnx).getCommandSender();
 
         return new Consumer(sub, SubType.Shared, topic.getName(), consumerId, 
0, consumerNameBase + consumerId, 50000,
-                cnx, role, Collections.emptyMap(), false, 
InitialPosition.Latest, null);
+                cnx, role, Collections.emptyMap(), false, 
InitialPosition.Latest, null, MessageId.latest);
     }
 
     @Test
@@ -1108,7 +1109,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
         Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
-                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null);
+                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest);
         sub.addConsumer(consumer1);
 
         doAnswer(new Answer<Object>() {
@@ -1132,7 +1133,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             Thread.sleep(10); /* delay to ensure that the ubsubscribe gets 
executed first */
             sub.addConsumer(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */,
                     0, "Cons2"/* consumer name */, 50000, serverCnx,
-                    "myrole-1", Collections.emptyMap(), false /* read 
compacted */, InitialPosition.Latest, null)).get();
+                    "myrole-1", Collections.emptyMap(), false /* read 
compacted */, InitialPosition.Latest, null, MessageId.latest)).get();
             fail();
         } catch (Exception e) {
             assertTrue(e.getCause() instanceof 
BrokerServiceException.SubscriptionFencedException);
@@ -1934,21 +1935,21 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         ManagedCursor cursor1 = ledger.openCursor("c1");
         PersistentSubscription sub1 = new PersistentSubscription(topic, 
"sub-1", cursor1, false);
         Consumer consumer1 = new Consumer(sub1, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
-            50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null);
+            50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest);
         topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1);
         sub1.addConsumer(consumer1);
         // Open cursor2, add it into activeCursor-container and add it into 
subscription consumer list
         ManagedCursor cursor2 = ledger.openCursor("c2");
         PersistentSubscription sub2 = new PersistentSubscription(topic, 
"sub-2", cursor2, false);
         Consumer consumer2 = new Consumer(sub2, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
-            50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null);
+            50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest);
         topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2);
         sub2.addConsumer(consumer2);
         // Open cursor3, add it into activeCursor-container and do not add it 
into subscription consumer list
         ManagedCursor cursor3 = ledger.openCursor("c3");
         PersistentSubscription sub3 = new PersistentSubscription(topic, 
"sub-3", cursor3, false);
         Consumer consumer3 = new Consumer(sub2, SubType.Exclusive, 
topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */,
-            50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null);
+            50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest, null, MessageId.latest);
         topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3);
 
         // Case1: cursors are active as haven't started 
deactivateBacklogCursor scan
@@ -2058,7 +2059,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         addConsumerToSubscription.setAccessible(true);
 
         Consumer consumer = new Consumer(nonDeletableSubscription1, 
SubType.Shared, topic.getName(), 1, 0, "consumer1",
-                50000, serverCnx, "app1", Collections.emptyMap(), false, 
InitialPosition.Latest, null);
+                50000, serverCnx, "app1", Collections.emptyMap(), false, 
InitialPosition.Latest, null, MessageId.latest);
         addConsumerToSubscription.invoke(topic, nonDeletableSubscription1, 
consumer);
 
         when(pulsar.getConfigurationCache().policiesCache()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
new file mode 100644
index 0000000..a73d1f5
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.swagger.models.auth.In;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
+    private ScheduledExecutorService compactionScheduler;
+    private BookKeeper bk;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+        conf.setManagedLedgerMaxEntriesPerLedger(2);
+        super.internalSetup();
+
+        admin.clusters().createCluster("use", 
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("my-tenant",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-tenant/use/my-ns");
+
+        compactionScheduler = Executors.newSingleThreadScheduledExecutor(
+                new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
+        bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, 
Optional.empty(), null);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+
+        if (compactionScheduler != null) {
+            compactionScheduler.shutdownNow();
+        }
+    }
+
+    /**
+     * Compaction should retain expired keys in the compacted view
+     */
+    @Test
+    public void testCompaction() throws Exception {
+        String topic = "persistent://my-tenant/use/my-ns/my-topic-" + 
System.nanoTime();
+
+        Set<String> keys = Sets.newHashSet("a", "b", "c");
+        Set<String> keysToExpire = Sets.newHashSet("x1", "x2");
+        Set<String> allKeys = new HashSet<>();
+        allKeys.addAll(keys);
+        allKeys.addAll(keysToExpire);
+
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .create();
+
+        Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, 
compactionScheduler);
+        compactor.compact(topic).join();
+
+        log.info(" ---- X 1: {}", mapper.writeValueAsString(
+                admin.topics().getInternalStats(topic, false)));
+
+        int round = 1;
+
+        for (String key : allKeys) {
+            producer.newMessage()
+                    .key(key)
+                    .value(round)
+                    .send();
+        }
+
+        log.info(" ---- X 2: {}", mapper.writeValueAsString(
+                admin.topics().getInternalStats(topic, false)));
+
+        validateMessages(pulsarClient, true, topic, round, allKeys);
+
+        compactor.compact(topic).join();
+
+        log.info(" ---- X 3: {}", mapper.writeValueAsString(
+                admin.topics().getInternalStats(topic, false)));
+
+        validateMessages(pulsarClient, true, topic, round, allKeys);
+
+        round = 2;
+
+        for (String key : allKeys) {
+            producer.newMessage()
+                    .key(key)
+                    .value(round)
+                    .send();
+        }
+
+        compactor.compact(topic).join();
+
+        validateMessages(pulsarClient, true, topic, round, allKeys);
+
+        // Now explicitly remove the expiring keys
+        for (String key : keysToExpire) {
+            producer.newMessage()
+                    .key(key)
+                    .send();
+        }
+
+        compactor.compact(topic).join();
+
+        log.info(" ---- X 4: {}", mapper.writeValueAsString(
+                admin.topics().getInternalStats(topic, false)));
+
+        validateMessages(pulsarClient, true, topic, round, keys);
+
+        // In the raw topic there should be no messages
+        validateMessages(pulsarClient, false, topic, round, 
Collections.emptySet());
+    }
+
+    private void validateMessages(PulsarClient client, boolean readCompacted, 
String topic, int round, Set<String> expectedKeys)
+            throws Exception {
+        @Cleanup
+        Reader<Integer> reader = client.newReader(Schema.INT32)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(readCompacted)
+                .create();
+
+        Map<String, Integer> receivedValues = new HashMap<>();
+
+        while (true) {
+            Message<Integer> msg = reader.readNext(1, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+
+            Integer value = msg.size() > 0 ? msg.getValue() : null;
+            log.info("Received: {} -- value: {}", msg.getKey(), value);
+            if (value != null) {
+                receivedValues.put(msg.getKey(), value);
+            }
+        }
+
+        Map<String, Integer> expectedReceivedValues = new HashMap<>();
+        expectedKeys.forEach(k -> expectedReceivedValues.put(k, round));
+
+        log.info("Received values: {}", receivedValues);
+        log.info("Expected values: {}", expectedReceivedValues);
+        assertEquals(receivedValues, expectedReceivedValues);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index b921266..4c96048 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.compaction.TwoPhaseCompactor;
 import org.apache.pulsar.functions.LocalRunner;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
+import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
@@ -74,12 +75,19 @@ public class PulsarSinkE2ETest extends 
AbstractPulsarE2ETest {
         final int messageNum = 20;
         final int maxKeys = 10;
         // 1 Setup producer
+        @Cleanup
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .topic(sourceTopic)
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
-        
pulsarClient.newConsumer().topic(sourceTopic).subscriptionName(subscriptionName).readCompacted(true).subscribe().close();
+        pulsarClient.newConsumer()
+                .topic(sourceTopic)
+                .subscriptionName(subscriptionName)
+                .readCompacted(true)
+                .subscribe()
+                .close();
+
         // 2 Send messages and record the expected values after compaction
         Map<String, String> expected = new HashMap<>();
         for (int j = 0; j < messageNum; j++) {
@@ -107,18 +115,12 @@ public class PulsarSinkE2ETest extends 
AbstractPulsarE2ETest {
         admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
 
         // 5 Sink should only read compacted value,so we will only receive 
compacted messages
-        retryStrategically((test) -> {
-            try {
-                String prometheusMetrics = 
PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
-                Map<String, PulsarFunctionTestUtils.Metric> metrics = 
PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
-                PulsarFunctionTestUtils.Metric m = 
metrics.get("pulsar_sink_received_total");
-                return m.value == (double) maxKeys;
-            } catch (Exception e) {
-                return false;
-            }
-        }, 50, 1000);
-
-        producer.close();
+        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+            String prometheusMetrics = 
PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+            Map<String, PulsarFunctionTestUtils.Metric> metrics = 
PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+            PulsarFunctionTestUtils.Metric m = 
metrics.get("pulsar_sink_received_total");
+            assertEquals(m.value, maxKeys);
+        });
     }
 
     @Test(timeOut = 30000)

Reply via email to