This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9c39726219aeb48b68f637a00e10b1208223a099 Author: Matteo Merli <[email protected]> AuthorDate: Tue Jul 13 16:56:01 2021 +0200 Fixed retention of keys in compaction (#11287) This change fixes few issues in the compaction mechanism, the * When a reader is created, reading from "earliest" message, it should read the compacted data and then continue from the next message. * When the compaction consumer starts, it shouldn't seek to the beginning. This causes 2 issues: * Rescanning of the topic each time the compaction runs * Keys that are being dropped from the topic are also getting dropped from the compacted view, while in fact they should be there until explicitly deleted (with an empty message for a key). The main source of the problem is that when creating a cursor on "earliest" message, the cursor gets automatically adjusted on the earliest message available to read. This confuses the check for the read-compacted because it may think the reader/consumer is already ahead of the compaction horizon. Introduced a "isFirstRead" flag to make sure we double check the start message id and use `MessageId.earliest` instead of the earliest available message to read on the topic. After the first read, the positioning will be fine. (cherry picked from commit feb4ff19e097a9d8f13b093e8fb25dc12c31227b) --- .../AbstractDispatcherSingleActiveConsumer.java | 6 + .../org/apache/pulsar/broker/service/Consumer.java | 12 +- .../service/nonpersistent/NonPersistentTopic.java | 2 +- .../PersistentDispatcherSingleActiveConsumer.java | 4 +- ...entStreamingDispatcherSingleActiveConsumer.java | 5 +- .../broker/service/persistent/PersistentTopic.java | 3 +- .../org/apache/pulsar/client/api/RawReader.java | 2 +- .../apache/pulsar/compaction/CompactedTopic.java | 8 +- .../pulsar/compaction/CompactedTopicImpl.java | 29 ++- .../PersistentDispatcherFailoverConsumerTest.java | 19 +- .../pulsar/broker/service/PersistentTopicTest.java | 43 ++-- .../pulsar/compaction/CompactionRetentionTest.java | 229 +++++++++++++++++++++ .../org/apache/pulsar/io/PulsarSinkE2ETest.java | 28 +-- 13 files changed, 328 insertions(+), 62 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index e73daaa..690a598 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -55,6 +55,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas AtomicIntegerFieldUpdater.newUpdater(AbstractDispatcherSingleActiveConsumer.class, "isClosed"); private volatile int isClosed = FALSE; + protected boolean isFirstRead = true; + public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, String topicName, Subscription subscription, ServiceConfiguration serviceConfig) { @@ -159,6 +161,10 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas isKeyHashRangeFiltered = false; } + if (consumers.isEmpty()) { + isFirstRead = true; + } + consumers.add(consumer); if (!pickAndScheduleActiveConsumer()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index f093cd6..4cfc089 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -41,6 +41,7 @@ import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAck.AckType; @@ -123,12 +124,13 @@ public class Consumer { private boolean preciseDispatcherFlowControl; private PositionImpl readPositionWhenJoining; private final String clientAddress; // IP address only, no port number included + private final MessageId startMessageId; public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, int maxUnackedMessages, TransportCnx cnx, String appId, Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, - KeySharedMeta keySharedMeta) { + KeySharedMeta keySharedMeta, MessageId startMessageId) { this.subscription = subscription; this.subType = subType; @@ -148,6 +150,10 @@ public class Consumer { this.bytesOutCounter = new LongAdder(); this.msgOutCounter = new LongAdder(); this.appId = appId; + + // Ensure we start from compacted view + this.startMessageId = (readCompacted && startMessageId == null) ? MessageId.earliest : startMessageId; + this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl(); PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0); MESSAGE_PERMITS_UPDATER.set(this, 0); @@ -835,5 +841,9 @@ public class Consumer { return clientAddress; } + public MessageId getStartMessageId() { + return startMessageId; + } + private static final Logger log = LoggerFactory.getLogger(Consumer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 5807d00..0c7b965 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -272,7 +272,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> new NonPersistentSubscription(this, subscriptionName)); Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, - cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta); + cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest); addConsumerToSubscription(subscription, consumer).thenRun(() -> { if (!cnx.isActive()) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 2bd94ff..4bae5b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -153,6 +153,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher } havePendingRead = false; + isFirstRead = false; if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) { int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize()); @@ -338,7 +339,8 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher } havePendingRead = true; if (consumer.readCompacted()) { - topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer); + topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead, + this, consumer); } else { cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, consumer, topic.getMaxReadPosition()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java index 82e8d6d..f19031b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java @@ -121,6 +121,8 @@ public class PersistentStreamingDispatcherSingleActiveConsumer extends Persisten havePendingRead = false; } + isFirstRead = false; + if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) { int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize()); if (log.isDebugEnabled()) { @@ -197,7 +199,8 @@ public class PersistentStreamingDispatcherSingleActiveConsumer extends Persisten havePendingRead = true; if (consumer.readCompacted()) { - topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer); + topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead, + this, consumer); } else { streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ad781cd..013710e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -705,7 +705,6 @@ public class PersistentTopic extends AbstractTopic new NotAllowedException("Subscribe limited by subscribe rate limit per consumer.")); return future; } - } lock.readLock().lock(); @@ -733,7 +732,7 @@ public class PersistentTopic extends AbstractTopic subscriptionFuture.thenAccept(subscription -> { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata, - readCompacted, initialPosition, keySharedMeta); + readCompacted, initialPosition, keySharedMeta, startMessageId); addConsumerToSubscription(subscription, consumer).thenAccept(v -> { checkBackloggedCursors(); if (!cnx.isActive()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index 415c3dc..f74157a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -34,7 +34,7 @@ public interface RawReader { static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) { CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>(); RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future); - return future.thenCompose((consumer) -> r.seekAsync(MessageId.earliest)).thenApply((ignore) -> r); + return future.thenCompose(x -> x.seekAsync(MessageId.earliest)).thenApply(__ -> r); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java index 88b8e58..4922852 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java @@ -22,9 +22,13 @@ import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.Consumer; public interface CompactedTopic { CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId); - void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead, - ReadEntriesCallback callback, Object ctx); + void asyncReadEntriesOrWait(ManagedCursor cursor, + int numberOfEntriesToRead, + boolean isFirstRead, + ReadEntriesCallback callback, + Consumer consumer); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 12748e8..21e9a1d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -42,6 +42,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.common.api.proto.MessageIdData; @@ -81,13 +83,20 @@ public class CompactedTopicImpl implements CompactedTopic { } @Override - public void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead, - ReadEntriesCallback callback, Object ctx) { + public void asyncReadEntriesOrWait(ManagedCursor cursor, + int numberOfEntriesToRead, + boolean isFirstRead, + ReadEntriesCallback callback, Consumer consumer) { synchronized (this) { - PositionImpl cursorPosition = (PositionImpl) cursor.getReadPosition(); + PositionImpl cursorPosition; + if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){ + cursorPosition = PositionImpl.earliest; + } else { + cursorPosition = (PositionImpl) cursor.getReadPosition(); + } if (compactionHorizon == null || compactionHorizon.compareTo(cursorPosition) < 0) { - cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx, PositionImpl.latest); + cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.latest); } else { compactedTopicContext.thenCompose( (context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache) @@ -96,11 +105,11 @@ public class CompactedTopicImpl implements CompactedTopic { // the cursor just needs to be set to the compaction horizon if (startPoint == COMPACT_LEDGER_EMPTY) { cursor.seek(compactionHorizon.getNext()); - callback.readEntriesComplete(Collections.emptyList(), ctx); + callback.readEntriesComplete(Collections.emptyList(), consumer); return CompletableFuture.completedFuture(null); } if (startPoint == NEWER_THAN_COMPACTED && compactionHorizon.compareTo(cursorPosition) < 0) { - cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx, + cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, consumer, PositionImpl.latest); return CompletableFuture.completedFuture(null); } else { @@ -108,23 +117,23 @@ public class CompactedTopicImpl implements CompactedTopic { startPoint + numberOfEntriesToRead); if (startPoint == NEWER_THAN_COMPACTED) { cursor.seek(compactionHorizon.getNext()); - callback.readEntriesComplete(Collections.emptyList(), ctx); + callback.readEntriesComplete(Collections.emptyList(), consumer); return CompletableFuture.completedFuture(null); } return readEntries(context.ledger, startPoint, endPoint) .thenAccept((entries) -> { Entry lastEntry = entries.get(entries.size() - 1); cursor.seek(lastEntry.getPosition().getNext()); - callback.readEntriesComplete(entries, ctx); + callback.readEntriesComplete(entries, consumer); }); } })) .exceptionally((exception) -> { if (exception.getCause() instanceof NoSuchElementException) { cursor.seek(compactionHorizon.getNext()); - callback.readEntriesComplete(Collections.emptyList(), ctx); + callback.readEntriesComplete(Collections.emptyList(), consumer); } else { - callback.readEntriesFailed(new ManagedLedgerException(exception), ctx); + callback.readEntriesFailed(new ManagedLedgerException(exception), consumer); } return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 6cccfee..692ea22 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -73,6 +73,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleAct import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; @@ -299,7 +300,7 @@ public class PersistentDispatcherFailoverConsumerTest { // 2. Add old consumer Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, - "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); pdfc.addConsumer(consumer1); List<Consumer> consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -310,7 +311,7 @@ public class PersistentDispatcherFailoverConsumerTest { // 3. Add new consumer Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, - "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -339,7 +340,7 @@ public class PersistentDispatcherFailoverConsumerTest { // 2. Add consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer1); List<Consumer> consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -363,7 +364,7 @@ public class PersistentDispatcherFailoverConsumerTest { // 5. Add another consumer which does not change active consumer Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null)); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); @@ -377,7 +378,7 @@ public class PersistentDispatcherFailoverConsumerTest { // 6. Add a consumer which changes active consumer Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0, "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer0); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer0.consumerName()); @@ -460,7 +461,7 @@ public class PersistentDispatcherFailoverConsumerTest { // 2. Add a consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer1); List<Consumer> consumers = pdfc.getConsumers(); assertEquals(1, consumers.size()); @@ -469,7 +470,7 @@ public class PersistentDispatcherFailoverConsumerTest { // 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order. Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer2); // 4. Verify active consumer doesn't change @@ -482,7 +483,7 @@ public class PersistentDispatcherFailoverConsumerTest { // 5. Add another consumer which has higher priority level Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null)); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)); pdfc.addConsumer(consumer3); consumers = pdfc.getConsumers(); assertEquals(3, consumers.size()); @@ -672,7 +673,7 @@ public class PersistentDispatcherFailoverConsumerTest { private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception { Consumer consumer = new Consumer(null, SubType.Shared, "test-topic", id, priority, ""+id, 5000, - serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); try { consumer.flowPermits(permit); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 9de3277..4a804b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -100,6 +100,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleAct import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -715,7 +716,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest); sub.addConsumer(consumer); consumer.close(); @@ -726,7 +727,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { consumer = new Consumer(sub, subType, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest); sub.addConsumer(consumer); assertTrue(sub.getDispatcher().isConsumerConnected()); @@ -749,7 +750,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { // 1. simple add consumer Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer); assertTrue(sub.getDispatcher().isConsumerConnected()); @@ -782,7 +783,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { PersistentSubscription sub = new PersistentSubscription(topic, "non-durable-sub", cursorMock, false); Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, - "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer); assertFalse(sub.getDispatcher().isClosed()); @@ -818,14 +819,14 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { // 1. add consumer1 Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer); assertEquals(sub.getConsumers().size(), 1); // 2. add consumer2 Consumer consumer2 = new Consumer(sub, SubType.Shared, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer2); assertEquals(sub.getConsumers().size(), 2); @@ -833,7 +834,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { try { Consumer consumer3 = new Consumer(sub, SubType.Shared, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, sub, consumer3)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -846,7 +847,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { // 4. add consumer4 to sub2 Consumer consumer4 = new Consumer(sub2, SubType.Shared, topic.getName(), 4 /* consumer id */, 0, "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub2, consumer4); assertEquals(sub2.getConsumers().size(), 1); @@ -857,7 +858,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { try { Consumer consumer5 = new Consumer(sub2, SubType.Shared, topic.getName(), 5 /* consumer id */, 0, "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, sub2, consumer5)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -913,14 +914,14 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { // 1. add consumer1 Consumer consumer = new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer); assertEquals(sub.getConsumers().size(), 1); // 2. add consumer2 Consumer consumer2 = new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub, consumer2); assertEquals(sub.getConsumers().size(), 2); @@ -928,7 +929,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { try { Consumer consumer3 = new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, sub, consumer3)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -941,7 +942,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { // 4. add consumer4 to sub2 Consumer consumer4 = new Consumer(sub2, SubType.Failover, topic.getName(), 4 /* consumer id */, 0, "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, sub2, consumer4); assertEquals(sub2.getConsumers().size(), 1); @@ -952,7 +953,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { try { Consumer consumer5 = new Consumer(sub2, SubType.Failover, topic.getName(), 5 /* consumer id */, 0, "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); ((CompletableFuture<Void>) addConsumerToSubscription.invoke(topic, sub2, consumer5)).get(); fail("should have failed"); } catch (ExecutionException e) { @@ -1000,7 +1001,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doReturn(new PulsarCommandSenderImpl(null, cnx)).when(cnx).getCommandSender(); return new Consumer(sub, SubType.Shared, topic.getName(), consumerId, 0, consumerNameBase + consumerId, 50000, - cnx, role, Collections.emptyMap(), false, InitialPosition.Latest, null); + cnx, role, Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); } @Test @@ -1108,7 +1109,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); sub.addConsumer(consumer1); doAnswer(new Answer<Object>() { @@ -1132,7 +1133,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */ sub.addConsumer(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, - "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null)).get(); + "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest)).get(); fail(); } catch (Exception e) { assertTrue(e.getCause() instanceof BrokerServiceException.SubscriptionFencedException); @@ -1934,21 +1935,21 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { ManagedCursor cursor1 = ledger.openCursor("c1"); PersistentSubscription sub1 = new PersistentSubscription(topic, "sub-1", cursor1, false); Consumer consumer1 = new Consumer(sub1, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1); sub1.addConsumer(consumer1); // Open cursor2, add it into activeCursor-container and add it into subscription consumer list ManagedCursor cursor2 = ledger.openCursor("c2"); PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursor2, false); Consumer consumer2 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2); sub2.addConsumer(consumer2); // Open cursor3, add it into activeCursor-container and do not add it into subscription consumer list ManagedCursor cursor3 = ledger.openCursor("c3"); PersistentSubscription sub3 = new PersistentSubscription(topic, "sub-3", cursor3, false); Consumer consumer3 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, MessageId.latest); topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3); // Case1: cursors are active as haven't started deactivateBacklogCursor scan @@ -2058,7 +2059,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { addConsumerToSubscription.setAccessible(true); Consumer consumer = new Consumer(nonDeletableSubscription1, SubType.Shared, topic.getName(), 1, 0, "consumer1", - 50000, serverCnx, "app1", Collections.emptyMap(), false, InitialPosition.Latest, null); + 50000, serverCnx, "app1", Collections.emptyMap(), false, InitialPosition.Latest, null, MessageId.latest); addConsumerToSubscription.invoke(topic, nonDeletableSubscription1, consumer); when(pulsar.getConfigurationCache().policiesCache() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java new file mode 100644 index 0000000..a73d1f5 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.swagger.models.auth.In; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.api.OpenBuilder; +import org.apache.bookkeeper.mledger.ManagedLedgerInfo; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.EncryptionKeyInfo; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class CompactionRetentionTest extends MockedPulsarServiceBaseTest { + private ScheduledExecutorService compactionScheduler; + private BookKeeper bk; + + @BeforeMethod + @Override + public void setup() throws Exception { + conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + conf.setManagedLedgerMaxEntriesPerLedger(2); + super.internalSetup(); + + admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("my-tenant", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("my-tenant/use/my-ns"); + + compactionScheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); + bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null); + } + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + + if (compactionScheduler != null) { + compactionScheduler.shutdownNow(); + } + } + + /** + * Compaction should retain expired keys in the compacted view + */ + @Test + public void testCompaction() throws Exception { + String topic = "persistent://my-tenant/use/my-ns/my-topic-" + System.nanoTime(); + + Set<String> keys = Sets.newHashSet("a", "b", "c"); + Set<String> keysToExpire = Sets.newHashSet("x1", "x2"); + Set<String> allKeys = new HashSet<>(); + allKeys.addAll(keys); + allKeys.addAll(keysToExpire); + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.INDENT_OUTPUT, true); + + @Cleanup + Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .create(); + + Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic).join(); + + log.info(" ---- X 1: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + int round = 1; + + for (String key : allKeys) { + producer.newMessage() + .key(key) + .value(round) + .send(); + } + + log.info(" ---- X 2: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + validateMessages(pulsarClient, true, topic, round, allKeys); + + compactor.compact(topic).join(); + + log.info(" ---- X 3: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + validateMessages(pulsarClient, true, topic, round, allKeys); + + round = 2; + + for (String key : allKeys) { + producer.newMessage() + .key(key) + .value(round) + .send(); + } + + compactor.compact(topic).join(); + + validateMessages(pulsarClient, true, topic, round, allKeys); + + // Now explicitly remove the expiring keys + for (String key : keysToExpire) { + producer.newMessage() + .key(key) + .send(); + } + + compactor.compact(topic).join(); + + log.info(" ---- X 4: {}", mapper.writeValueAsString( + admin.topics().getInternalStats(topic, false))); + + validateMessages(pulsarClient, true, topic, round, keys); + + // In the raw topic there should be no messages + validateMessages(pulsarClient, false, topic, round, Collections.emptySet()); + } + + private void validateMessages(PulsarClient client, boolean readCompacted, String topic, int round, Set<String> expectedKeys) + throws Exception { + @Cleanup + Reader<Integer> reader = client.newReader(Schema.INT32) + .topic(topic) + .startMessageId(MessageId.earliest) + .readCompacted(readCompacted) + .create(); + + Map<String, Integer> receivedValues = new HashMap<>(); + + while (true) { + Message<Integer> msg = reader.readNext(1, TimeUnit.SECONDS); + if (msg == null) { + break; + } + + Integer value = msg.size() > 0 ? msg.getValue() : null; + log.info("Received: {} -- value: {}", msg.getKey(), value); + if (value != null) { + receivedValues.put(msg.getKey(), value); + } + } + + Map<String, Integer> expectedReceivedValues = new HashMap<>(); + expectedKeys.forEach(k -> expectedReceivedValues.put(k, round)); + + log.info("Received values: {}", receivedValues); + log.info("Expected values: {}", expectedReceivedValues); + assertEquals(receivedValues, expectedReceivedValues); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index b921266..4c96048 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.compaction.TwoPhaseCompactor; import org.apache.pulsar.functions.LocalRunner; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils; +import org.awaitility.Awaitility; import org.testng.annotations.Test; import com.google.common.collect.Lists; @@ -74,12 +75,19 @@ public class PulsarSinkE2ETest extends AbstractPulsarE2ETest { final int messageNum = 20; final int maxKeys = 10; // 1 Setup producer + @Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING) .topic(sourceTopic) .enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); - pulsarClient.newConsumer().topic(sourceTopic).subscriptionName(subscriptionName).readCompacted(true).subscribe().close(); + pulsarClient.newConsumer() + .topic(sourceTopic) + .subscriptionName(subscriptionName) + .readCompacted(true) + .subscribe() + .close(); + // 2 Send messages and record the expected values after compaction Map<String, String> expected = new HashMap<>(); for (int j = 0; j < messageNum; j++) { @@ -107,18 +115,12 @@ public class PulsarSinkE2ETest extends AbstractPulsarE2ETest { admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl); // 5 Sink should only read compacted value,so we will only receive compacted messages - retryStrategically((test) -> { - try { - String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get()); - Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics); - PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total"); - return m.value == (double) maxKeys; - } catch (Exception e) { - return false; - } - }, 50, 1000); - - producer.close(); + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get()); + Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics); + PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total"); + assertEquals(m.value, maxKeys); + }); } @Test(timeOut = 30000)
