This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 80f921a45bb [fix][broker] Fix issue with consumer read uncommitted
messages from compacted topic (#21465)
80f921a45bb is described below
commit 80f921a45bb023fca36faf98038f3ec687e05f16
Author: Cong Zhao <[email protected]>
AuthorDate: Fri Nov 3 19:03:10 2023 +0800
[fix][broker] Fix issue with consumer read uncommitted messages from
compacted topic (#21465)
---
.../PersistentDispatcherSingleActiveConsumer.java | 6 ++-
.../apache/pulsar/compaction/CompactedTopic.java | 5 +-
.../pulsar/compaction/CompactedTopicImpl.java | 3 +-
.../pulsar/compaction/CompactedTopicUtils.java | 10 ++--
.../pulsar/broker/transaction/TransactionTest.java | 55 ++++++++++++++++++++++
.../pulsar/compaction/CompactedTopicUtilsTest.java | 4 +-
6 files changed, 71 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index d96429693fd..5e9183df0b1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.compaction.CompactedTopicUtils;
+import org.apache.pulsar.compaction.TopicCompactionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -350,8 +351,9 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
havePendingRead = true;
if (consumer.readCompacted()) {
boolean readFromEarliest = isFirstRead &&
MessageId.earliest.equals(consumer.getStartMessageId());
-
CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(),
cursor,
- messagesToRead, bytesToRead, readFromEarliest,
this, true, consumer);
+ TopicCompactionService topicCompactionService =
topic.getTopicCompactionService();
+
CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor,
messagesToRead,
+ bytesToRead, topic.getMaxReadPosition(),
readFromEarliest, this, true, consumer);
} else {
ReadEntriesCtx readEntriesCtx =
ReadEntriesCtx.create(consumer,
consumer.getConsumerEpoch());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 8c17e0f3ca3..146ba4327d2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -24,6 +24,7 @@ import
org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;
public interface CompactedTopic {
@@ -34,12 +35,14 @@ public interface CompactedTopic {
* Read entries from compacted topic.
*
* @deprecated Use {@link
CompactedTopicUtils#asyncReadCompactedEntries(TopicCompactionService,
ManagedCursor,
- * int, long, boolean, ReadEntriesCallback, boolean, Consumer)} instead.
+ * int, long, org.apache.bookkeeper.mledger.impl.PositionImpl, boolean,
ReadEntriesCallback, boolean, Consumer)}
+ * instead.
*/
@Deprecated
void asyncReadEntriesOrWait(ManagedCursor cursor,
int maxEntries,
long bytesToRead,
+ PositionImpl maxReadPosition,
boolean isFirstRead,
ReadEntriesCallback callback,
Consumer consumer);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index b028b708c49..8794e2736d4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -96,6 +96,7 @@ public class CompactedTopicImpl implements CompactedTopic {
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int maxEntries,
long bytesToRead,
+ PositionImpl maxReadPosition,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer
consumer) {
PositionImpl cursorPosition;
@@ -112,7 +113,7 @@ public class CompactedTopicImpl implements CompactedTopic {
if (currentCompactionHorizon == null
|| currentCompactionHorizon.compareTo(cursorPosition) < 0) {
- cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead,
callback, readEntriesCtx, PositionImpl.LATEST);
+ cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead,
callback, readEntriesCtx, maxReadPosition);
} else {
ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
int numberOfEntriesToRead =
managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
index 66bcf4c3002..d3464d402e9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -42,8 +42,8 @@ public class CompactedTopicUtils {
@Beta
public static void asyncReadCompactedEntries(TopicCompactionService
topicCompactionService,
ManagedCursor cursor, int
maxEntries,
- long bytesToRead, boolean
readFromEarliest,
-
AsyncCallbacks.ReadEntriesCallback callback,
+ long bytesToRead,
PositionImpl maxReadPosition,
+ boolean readFromEarliest,
AsyncCallbacks.ReadEntriesCallback callback,
boolean wait, @Nullable
Consumer consumer) {
Objects.requireNonNull(topicCompactionService);
Objects.requireNonNull(cursor);
@@ -68,11 +68,9 @@ public class CompactedTopicUtils {
|| readPosition.compareTo(
lastCompactedPosition.getLedgerId(),
lastCompactedPosition.getEntryId()) > 0) {
if (wait) {
- cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead,
callback, readEntriesCtx,
- PositionImpl.LATEST);
+ cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead,
callback, readEntriesCtx, maxReadPosition);
} else {
- cursor.asyncReadEntries(maxEntries, bytesToRead, callback,
readEntriesCtx,
- PositionImpl.LATEST);
+ cursor.asyncReadEntries(maxEntries, bytesToRead, callback,
readEntriesCtx, maxReadPosition);
}
return CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index cf389824794..e4cc33de14b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1783,4 +1783,59 @@ public class TransactionTest extends TransactionTestBase
{
});
}
+ @Test
+ public void testReadCommittedWithReadCompacted() throws Exception{
+ final String namespace = "tnx/ns-prechecks";
+ final String topic = "persistent://" + namespace +
"/test_transaction_topic";
+ admin.namespaces().createNamespace(namespace);
+ admin.topics().createNonPartitionedTopic(topic);
+
+ admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024);
+
+ @Cleanup
+ Consumer<String> consumer =
this.pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .readCompacted(true)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer =
this.pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ producer.newMessage().key("K1").value("V1").send();
+
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+ producer.newMessage(txn).key("K2").value("V2").send();
+ producer.newMessage(txn).key("K3").value("V3").send();
+
+ List<String> messages = new ArrayList<>();
+ while (true) {
+ Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ messages.add(message.getValue());
+ }
+
+ Assert.assertEquals(messages, List.of("V1"));
+
+ txn.commit();
+
+ messages.clear();
+
+ while (true) {
+ Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ messages.add(message.getValue());
+ }
+
+ Assert.assertEquals(messages, List.of("V2", "V3"));
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
index 94f2a17a2a3..2545c0362e8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java
@@ -69,8 +69,8 @@ public class CompactedTopicUtilsTest {
}
};
- CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100,
false,
- readEntriesCallback, false, null);
+ CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100,
+ PositionImpl.LATEST, false, readEntriesCallback, false, null);
List<Entry> entries = completableFuture.get();
Assert.assertTrue(entries.isEmpty());