codelipenghui commented on a change in pull request #14170:
URL: https://github.com/apache/pulsar/pull/14170#discussion_r802182871



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -160,6 +160,10 @@ public int 
filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
             if (!isReplayRead && msgMetadata != null && 
msgMetadata.hasTxnidMostBits()
                     && msgMetadata.hasTxnidLeastBits()) {
                 if (Markers.isTxnMarker(msgMetadata)) {
+                    // because consumer can receive message is smaller than 
maxReadPosition,
+                    // so this marker is unless for this subscription

Review comment:
       ```suggestion
                       // so this marker is useless for this subscription
   ```

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
##########
@@ -97,84 +92,74 @@ public void testMarkerDeleteTimes() throws Exception {
 
     @Test
     public void testMarkerDelete() throws Exception {
-
-        MessageMetadata msgMetadata = new MessageMetadata().clear()
-                .setPublishTime(1)
-                .setProducerName("test")
-                .setSequenceId(0);
-
-        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);
-
-        payload = 
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
-                msgMetadata, payload);
-
-        ManagedLedger managedLedger = 
pulsar.getManagedLedgerFactory().open("test");
-        PersistentTopic topic = mock(PersistentTopic.class);
-        doReturn(pulsar.getBrokerService()).when(topic).getBrokerService();
-        doReturn(managedLedger).when(topic).getManagedLedger();
-        doReturn("test").when(topic).getName();
-        ManagedCursor cursor = managedLedger.openCursor("test");
-        PersistentSubscription persistentSubscription = new 
PersistentSubscription(topic, "test",
-                managedLedger.openCursor("test"), false);
-
-        byte[] payloadBytes = toBytes(payload);
-        Position position1 = managedLedger.addEntry(payloadBytes);
-        Position markerPosition1 = managedLedger.addEntry(toBytes(Markers
-                .newTxnCommitMarker(1, 1, 1)));
-
-        Position position2 = managedLedger.addEntry(payloadBytes);
-        Position markerPosition2 = managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        Position position3 = managedLedger.addEntry(payloadBytes);
-
-        assertEquals(cursor.getNumberOfEntriesInBacklog(true), 5);
-        assertTrue(((PositionImpl) 
cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0);
-
-        // ack position1, markerDeletePosition to markerPosition1
-        
persistentSubscription.acknowledgeMessage(Collections.singletonList(position1),
-                AckType.Individual, Collections.emptyMap());
-
-        // ack position1, markerDeletePosition to markerPosition1
-        Awaitility.await().during(1, TimeUnit.SECONDS).until(() ->
-                ((PositionImpl) 
persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition1) == 0);
-
-        // ack position2, markerDeletePosition to markerPosition2
-        
persistentSubscription.acknowledgeMessage(Collections.singletonList(position2),
-                AckType.Individual, Collections.emptyMap());
-
-        Awaitility.await().until(() ->
-                ((PositionImpl) 
persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition2) == 0);
-
-        // add consequent marker
-        managedLedger.addEntry(toBytes(Markers
-                .newTxnCommitMarker(1, 1, 1)));
-
-        managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        Position markerPosition3 = managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        // ack with transaction, then commit this transaction
-        persistentSubscription.transactionIndividualAcknowledge(new TxnID(0, 
0),
-                Collections.singletonList(MutablePair.of((PositionImpl) 
position3, 0))).get();
-
-        persistentSubscription.endTxn(0, 0, 0, 0).get();
-
-        // ack with transaction, then commit this transaction
-        Awaitility.await().until(() ->
-                ((PositionImpl) 
persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition3) == 0);
-
+        final String subName = "testMarkerDelete";
+        final String topicName = NAMESPACE1 + "/testMarkerDelete";
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient
+                .newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topicName)
+                .create();
+
+        Transaction txn1 = getTxn();
+        Transaction txn2 = getTxn();
+        Transaction txn3 = getTxn();
+        Transaction txn4 = getTxn();
+
+        MessageIdImpl msgId1 = (MessageIdImpl) 
producer.newMessage(txn1).send();
+        MessageIdImpl msgId2 = (MessageIdImpl) 
producer.newMessage(txn2).send();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+        txn1.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to msgId1, msgId2 have not be committed
+        
assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId1.getLedgerId(), 
msgId1.getEntryId()).toString());
+
+        MessageIdImpl msgId3 = (MessageIdImpl) 
producer.newMessage(txn3).send();
+        txn2.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to txn1 marker, so entryId is 
msgId2.getEntryId() + 1,
+        // because send msgId2 before commit txn1
+        
assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId2.getLedgerId(), msgId2.getEntryId() + 
1).toString());
+
+        MessageIdImpl msgId4 = (MessageIdImpl) 
producer.newMessage(txn4).send();
+        txn3.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to txn2 marker, because msgId4 have not be 
committed
+        
assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId3.getLedgerId(), msgId3.getEntryId() + 
1).toString());
+
+        txn4.abort().get();
+
+        // maxReadPosition move to txn4 abort marker, so entryId is 
msgId4.getEntryId() + 1

Review comment:
       Should be msgId4.getEntryId() + 2? (msgId4 + txn3 commit marker + txn4 
abort marker)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to