codelipenghui commented on a change in pull request #14170:
URL: https://github.com/apache/pulsar/pull/14170#discussion_r801695158
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -160,6 +160,10 @@ public int
filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
if (!isReplayRead && msgMetadata != null &&
msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
+ // because consumer can receive message is smaller than
maxReadPosition,
+ // so this marker is no value for this subscription
Review comment:
```suggestion
// so this marker is useless for this subscription
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -160,6 +160,10 @@ public int
filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
if (!isReplayRead && msgMetadata != null &&
msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
+ // because consumer can receive message is smaller than
maxReadPosition,
+ // so this marker is no value for this subscription
+
subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()),
AckType.Individual,
+ Collections.emptyMap());
Review comment:
We can add it to `entriesToFiltered` first to avoid multiple calls for
subscription.acknowledgeMessage().
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
##########
@@ -97,84 +91,66 @@ public void testMarkerDeleteTimes() throws Exception {
@Test
public void testMarkerDelete() throws Exception {
-
- MessageMetadata msgMetadata = new MessageMetadata().clear()
- .setPublishTime(1)
- .setProducerName("test")
- .setSequenceId(0);
-
- ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);
-
- payload =
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
- msgMetadata, payload);
-
- ManagedLedger managedLedger =
pulsar.getManagedLedgerFactory().open("test");
- PersistentTopic topic = mock(PersistentTopic.class);
- doReturn(pulsar.getBrokerService()).when(topic).getBrokerService();
- doReturn(managedLedger).when(topic).getManagedLedger();
- doReturn("test").when(topic).getName();
- ManagedCursor cursor = managedLedger.openCursor("test");
- PersistentSubscription persistentSubscription = new
PersistentSubscription(topic, "test",
- managedLedger.openCursor("test"), false);
-
- byte[] payloadBytes = toBytes(payload);
- Position position1 = managedLedger.addEntry(payloadBytes);
- Position markerPosition1 = managedLedger.addEntry(toBytes(Markers
- .newTxnCommitMarker(1, 1, 1)));
-
- Position position2 = managedLedger.addEntry(payloadBytes);
- Position markerPosition2 = managedLedger.addEntry(toBytes(Markers
- .newTxnAbortMarker(1, 1, 1)));
-
- Position position3 = managedLedger.addEntry(payloadBytes);
-
- assertEquals(cursor.getNumberOfEntriesInBacklog(true), 5);
- assertTrue(((PositionImpl)
cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0);
-
- // ack position1, markerDeletePosition to markerPosition1
-
persistentSubscription.acknowledgeMessage(Collections.singletonList(position1),
- AckType.Individual, Collections.emptyMap());
-
- // ack position1, markerDeletePosition to markerPosition1
- Awaitility.await().during(1, TimeUnit.SECONDS).until(() ->
- ((PositionImpl)
persistentSubscription.getCursor().getMarkDeletedPosition())
- .compareTo((PositionImpl) markerPosition1) == 0);
-
- // ack position2, markerDeletePosition to markerPosition2
-
persistentSubscription.acknowledgeMessage(Collections.singletonList(position2),
- AckType.Individual, Collections.emptyMap());
-
- Awaitility.await().until(() ->
- ((PositionImpl)
persistentSubscription.getCursor().getMarkDeletedPosition())
- .compareTo((PositionImpl) markerPosition2) == 0);
-
- // add consequent marker
- managedLedger.addEntry(toBytes(Markers
- .newTxnCommitMarker(1, 1, 1)));
-
- managedLedger.addEntry(toBytes(Markers
- .newTxnAbortMarker(1, 1, 1)));
-
- Position markerPosition3 = managedLedger.addEntry(toBytes(Markers
- .newTxnAbortMarker(1, 1, 1)));
-
- // ack with transaction, then commit this transaction
- persistentSubscription.transactionIndividualAcknowledge(new TxnID(0,
0),
- Collections.singletonList(MutablePair.of((PositionImpl)
position3, 0))).get();
-
- persistentSubscription.endTxn(0, 0, 0, 0).get();
-
- // ack with transaction, then commit this transaction
- Awaitility.await().until(() ->
- ((PositionImpl)
persistentSubscription.getCursor().getMarkDeletedPosition())
- .compareTo((PositionImpl) markerPosition3) == 0);
-
+ final String subName = "testMarkerDelete";
+ final String topicName = NAMESPACE1 + "/testMarkerDelete";
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient
+ .newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .isAckReceiptEnabled(true)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient
+ .newProducer()
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(topicName)
+ .create();
+
+ Transaction txn1 = getTxn();
+ Transaction txn2 = getTxn();
+ Transaction txn3 = getTxn();
+
+ MessageIdImpl msgId1 = (MessageIdImpl)
producer.newMessage(txn1).send();
+ MessageIdImpl msgId2 = (MessageIdImpl)
producer.newMessage(txn2).send();
+ assertNull(consumer.receive(1, TimeUnit.SECONDS));
+ txn1.commit().get();
+
+ consumer.acknowledgeAsync(consumer.receive()).get();
+ assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+ // maxReadPosition move to msgId1, msgId2 have not be committed
+
assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+ PositionImpl.get(msgId1.getLedgerId(),
msgId1.getEntryId()).toString());
+
+ MessageIdImpl msgId3 = (MessageIdImpl)
producer.newMessage(txn3).send();
+ txn2.commit().get();
+
+ consumer.acknowledgeAsync(consumer.receive()).get();
+ assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+ // maxReadPosition move to txn1 marker, so entryId is
msgId2.getEntryId() + 1,
+ // because send msgId2 before commit txn1
+
assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+ PositionImpl.get(msgId2.getLedgerId(), msgId2.getEntryId() +
1).toString());
+
+ txn3.commit().get();
+
+ consumer.acknowledgeAsync(consumer.receive()).get();
+ assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+ // maxReadPosition move to txn3 marker, so entryId is
msgId3.getEntryId() + 2, this is txn3 marker position
+ // send msgId2 before txn2 commit
+
assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+ PositionImpl.get(msgId3.getLedgerId(), msgId3.getEntryId() +
2).toString());
Review comment:
Reach here, which means after msg3, there 2 commit markers for msg2 and
msg3 right?
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
##########
@@ -97,84 +91,66 @@ public void testMarkerDeleteTimes() throws Exception {
@Test
public void testMarkerDelete() throws Exception {
Review comment:
For the test we also need to cover the abort markers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]