This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e99bf9a4a77a90f14956909a849f927fb1052a81
Author: Lari Hotari <[email protected]>
AuthorDate: Tue May 14 07:27:41 2024 +0300

    [fix][broker] fix replicated subscriptions for transactional messages 
(#22452)
    
    (cherry picked from commit 9fd1b61fc45d06348af0241f002966087f1822a0)
    
    # Conflicts:
    #       
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
---
 .../broker/service/persistent/PersistentTopic.java |  26 +--
 .../ReplicatedSubscriptionsController.java         |   4 +-
 .../transaction/buffer/TransactionBuffer.java      |   3 +-
 .../buffer/impl/InMemTransactionBuffer.java        |  13 +-
 .../buffer/impl/TopicTransactionBuffer.java        |  70 ++++++--
 .../buffer/impl/TransactionBufferDisable.java      |  13 +-
 .../pulsar/broker/service/PersistentTopicTest.java |   4 +-
 .../broker/service/ReplicatorSubscriptionTest.java |  25 +++
 .../TransactionalReplicateSubscriptionTest.java    | 182 +++++++++++++++++++++
 .../broker/transaction/TransactionProduceTest.java |  36 ++++
 .../pulsar/broker/transaction/TransactionTest.java |   2 +-
 11 files changed, 343 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 472387a0a9b..5eb0600a9fd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -128,6 +128,7 @@ import 
org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -266,9 +267,13 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     @Getter
     protected final TransactionBuffer transactionBuffer;
+    @Getter
+    private final TopicTransactionBuffer.MaxReadPositionCallBack 
maxReadPositionCallBack =
+            (oldPosition, newPosition) -> 
updateMaxReadPositionMovedForwardTimestamp();
 
-    // Record the last time a data message (ie: not an internal Pulsar marker) 
is published on the topic
-    private volatile long lastDataMessagePublishedTimestamp = 0;
+    // Record the last time max read position is moved forward, unless it's a 
marker message.
+    @Getter
+    private volatile long lastMaxReadPositionMovedForwardTimestamp = 0;
     @Getter
     private final ExecutorService orderedExecutor;
 
@@ -380,7 +385,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         } else {
             this.transactionBuffer = new TransactionBufferDisable(this);
         }
-        transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) 
ledger.getLastConfirmedEntry());
+        transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) 
ledger.getLastConfirmedEntry(), true);
         if (ledger instanceof ShadowManagedLedgerImpl) {
             shadowSourceTopic = 
TopicName.get(ledger.getConfig().getShadowSource());
         } else {
@@ -681,6 +686,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }
     }
 
+    private void updateMaxReadPositionMovedForwardTimestamp() {
+        lastMaxReadPositionMovedForwardTimestamp = Clock.systemUTC().millis();
+    }
+
     @Override
     public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
         PublishContext publishContext = (PublishContext) ctx;
@@ -689,12 +698,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         // Message has been successfully persisted
         messageDeduplication.recordMessagePersisted(publishContext, position);
 
-        if (!publishContext.isMarkerMessage()) {
-            lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
-        }
-
         // in order to sync the max position when cursor read entries
-        transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) 
ledger.getLastConfirmedEntry());
+        transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) 
ledger.getLastConfirmedEntry(),
+                publishContext.isMarkerMessage());
         publishContext.setMetadataFromEntryData(entryData);
         publishContext.completed(null, position.getLedgerId(), 
position.getEntryId());
         decrementPendingWriteOpsAndCheck();
@@ -4101,10 +4107,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return transactionBuffer.clearSnapshot().thenCompose(__ -> 
transactionBuffer.closeAsync());
     }
 
-    public long getLastDataMessagePublishedTimestamp() {
-        return lastDataMessagePublishedTimestamp;
-    }
-
     public Optional<TopicName> getShadowSourceTopic() {
         return Optional.ofNullable(shadowSourceTopic);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index e011ed8d660..3a796b3e96d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -206,8 +206,8 @@ public class ReplicatedSubscriptionsController implements 
AutoCloseable, Topic.P
     private void startNewSnapshot() {
         cleanupTimedOutSnapshots();
 
-        if (topic.getLastDataMessagePublishedTimestamp() < 
lastCompletedSnapshotStartTime
-                || topic.getLastDataMessagePublishedTimestamp() == 0) {
+        if (topic.getLastMaxReadPositionMovedForwardTimestamp() < 
lastCompletedSnapshotStartTime
+                || topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
             // There was no message written since the last snapshot, we can 
skip creating a new snapshot
             if (log.isDebugEnabled()) {
                 log.debug("[{}] There is no new data in topic. Skipping 
snapshot creation.", topic.getName());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index 7eb5d6f789c..79eb263473f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -149,8 +149,9 @@ public interface TransactionBuffer {
     /**
      * Sync max read position for normal publish.
      * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message.
      */
-    void syncMaxReadPositionForNormalPublish(PositionImpl position);
+    void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean 
isMarkerMessage);
 
     /**
      * Get the can read max position.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 978536c5f4e..bab7b64c608 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
@@ -213,11 +214,17 @@ class InMemTransactionBuffer implements TransactionBuffer 
{
     final ConcurrentMap<TxnID, TxnBuffer> buffers;
     final Map<Long, Set<TxnID>> txnIndex;
     private final Topic topic;
+    private final TopicTransactionBuffer.MaxReadPositionCallBack 
maxReadPositionCallBack;
 
     public InMemTransactionBuffer(Topic topic) {
         this.buffers = new ConcurrentHashMap<>();
         this.txnIndex = new HashMap<>();
         this.topic = topic;
+        if (topic instanceof PersistentTopic) {
+            this.maxReadPositionCallBack = ((PersistentTopic) 
topic).getMaxReadPositionCallBack();
+        } else {
+            this.maxReadPositionCallBack = null;
+        }
     }
 
     @Override
@@ -369,8 +376,10 @@ class InMemTransactionBuffer implements TransactionBuffer {
     }
 
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
-        //no-op
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, 
boolean isMarkerMessage) {
+        if (!isMarkerMessage && maxReadPositionCallBack != null) {
+            maxReadPositionCallBack.maxReadPositionMovedForward(null, 
position);
+        }
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 81c9ecfc728..dfb73815e08 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -103,6 +103,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     private final AbortedTxnProcessor snapshotAbortedTxnProcessor;
 
     private final AbortedTxnProcessor.SnapshotType snapshotType;
+    private final MaxReadPositionCallBack maxReadPositionCallBack;
 
     public TopicTransactionBuffer(PersistentTopic topic) {
         super(State.None);
@@ -120,6 +121,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             snapshotAbortedTxnProcessor = new 
SingleSnapshotAbortedTxnProcessorImpl(topic);
             snapshotType = AbortedTxnProcessor.SnapshotType.Single;
         }
+        this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
         this.recover();
     }
 
@@ -175,7 +177,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                                     if (Markers.isTxnAbortMarker(msgMetadata)) 
{
                                         
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
                                     }
-                                    updateMaxReadPosition(txnID);
+                                    removeTxnAndUpdateMaxReadPosition(txnID);
                                 } else {
                                     handleTransactionMessage(txnID, position);
                                 }
@@ -290,7 +292,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             ongoingTxns.put(txnId, (PositionImpl) position);
             PositionImpl firstPosition = 
ongoingTxns.get(ongoingTxns.firstKey());
             // max read position is less than first ongoing transaction 
message position
-            maxReadPosition = ((ManagedLedgerImpl) 
topic.getManagedLedger()).getPreviousPosition(firstPosition);
+            updateMaxReadPosition(((ManagedLedgerImpl) 
topic.getManagedLedger()).getPreviousPosition(firstPosition),
+                    false);
         }
     }
 
@@ -314,7 +317,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                     @Override
                     public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
                         synchronized (TopicTransactionBuffer.this) {
-                            updateMaxReadPosition(txnID);
+                            removeTxnAndUpdateMaxReadPosition(txnID);
                             handleLowWaterMark(txnID, lowWaterMark);
                             
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
                             takeSnapshotByChangeTimes();
@@ -361,7 +364,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                     public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
                         synchronized (TopicTransactionBuffer.this) {
                             
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, (PositionImpl) 
position);
-                            updateMaxReadPosition(txnID);
+                            removeTxnAndUpdateMaxReadPosition(txnID);
                             
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
                             takeSnapshotByChangeTimes();
                             txnAbortedCounter.increment();
@@ -444,17 +447,39 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and 
update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) 
topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) 
topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionCount.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the 
current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the 
lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to 
the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean 
disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {
+                this.changeMaxReadPositionCount.getAndIncrement();
+            }
+            if (!disableCallback) {
+                
maxReadPositionCallBack.maxReadPositionMovedForward(preMaxReadPosition, 
this.maxReadPosition);
+            }
         }
     }
 
@@ -479,17 +504,22 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such 
case, we
+     *                       don't need to trigger the callback to update 
lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, 
boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is 
can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't 
content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionCount.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);
                 }
             }
         }
@@ -674,6 +704,18 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         }
     }
 
+    /**
+     * A functional interface to handle the max read position move forward.
+     */
+    public interface MaxReadPositionCallBack {
+        /**
+         * callback method when max read position move forward.
+         * @param oldPosition the old max read position.
+         * @param newPosition the new max read position.
+         */
+        void maxReadPositionMovedForward(PositionImpl oldPosition, 
PositionImpl newPosition);
+    }
+
     static class FillEntryQueueCallback implements 
AsyncCallbacks.ReadEntriesCallback {
 
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 9de0888ae5b..ebd61dbaa82 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
@@ -42,8 +43,14 @@ import org.apache.pulsar.common.util.FutureUtil;
 public class TransactionBufferDisable implements TransactionBuffer {
 
     private final Topic topic;
+    private final TopicTransactionBuffer.MaxReadPositionCallBack 
maxReadPositionCallBack;
     public TransactionBufferDisable(Topic topic) {
         this.topic = topic;
+        if (topic instanceof PersistentTopic) {
+            this.maxReadPositionCallBack = ((PersistentTopic) 
topic).getMaxReadPositionCallBack();
+        } else {
+            this.maxReadPositionCallBack = null;
+        }
     }
 
     @Override
@@ -91,8 +98,10 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
     }
 
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
-        //no-op
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, 
boolean isMarkerMessage) {
+        if (!isMarkerMessage && maxReadPositionCallBack != null) {
+            maxReadPositionCallBack.maxReadPositionMovedForward(null, 
position);
+        }
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 412dfb381c8..bf57586e517 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -298,6 +298,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), 
any(AddEntryCallback.class), any());
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        long lastMaxReadPositionMovedForwardTimestamp = 
topic.getLastMaxReadPositionMovedForwardTimestamp();
+
         /*
          * MessageMetadata.Builder messageMetadata = 
MessageMetadata.newBuilder();
          * messageMetadata.setPublishTime(System.currentTimeMillis()); 
messageMetadata.setProducerName("producer-name");
@@ -322,10 +324,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                 assertEquals(entryData.array(), payload.array());
             }
         };
-
         topic.publishMessage(payload, publishContext);
 
         assertTrue(latch.await(1, TimeUnit.SECONDS));
+        assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() > 
lastMaxReadPositionMovedForwardTimestamp);
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index bfafdf89ed6..870054ecaf6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -26,6 +26,8 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
+
+import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -40,8 +42,10 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
+import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -728,6 +732,21 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
         consumer4.close();
     }
 
+    /**
+     * before sending message, we should wait for transaction buffer recover 
complete,
+     * or the MaxReadPosition will not move forward when the message is sent, 
and the
+     * MaxReadPositionMovedForwardTimestamp will not be updated, then the 
replication will not be triggered.
+     * @param topicName
+     * @throws Exception
+     */
+    private void waitTBRecoverComplete(PulsarService pulsarService, String 
topicName) throws Exception {
+        TopicTransactionBufferState buffer = (TopicTransactionBufferState) 
((PersistentTopic) pulsarService.getBrokerService()
+                .getTopic(topicName, 
false).get().get()).getTransactionBuffer();
+        Field stateField = 
TopicTransactionBufferState.class.getDeclaredField("state");
+        stateField.setAccessible(true);
+        Awaitility.await().until(() -> 
!stateField.get(buffer).toString().equals("Initializing"));
+    }
+
     /**
      * Tests replicated subscriptions when replicator producer is closed
      */
@@ -755,6 +774,9 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
                     .subscribe();
 
             // send one message to trigger replication
+            if (config1.isTransactionCoordinatorEnabled()) {
+                waitTBRecoverComplete(pulsar1, topicName);
+            }
             @Cleanup
             Producer<byte[]> producer = client1.newProducer().topic(topicName)
                     .enableBatching(false)
@@ -916,6 +938,9 @@ public class ReplicatorSubscriptionTest extends 
ReplicatorTestBase {
                 .statsInterval(0, TimeUnit.SECONDS).build();
 
         Producer<String> producer = 
client.newProducer(Schema.STRING).topic(topicName).create();
+        if (config1.isTransactionCoordinatorEnabled()) {
+            waitTBRecoverComplete(pulsar1, topicName);
+        }
         producer.newMessage().key("K1").value("V1").send();
         producer.newMessage().key("K1").value("V2").send();
         producer.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java
new file mode 100644
index 00000000000..2d348f82597
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class TransactionalReplicateSubscriptionTest extends ReplicatorTestBase 
{
+    @Override
+    @BeforeClass(timeOut = 300000)
+    public void setup() throws Exception {
+        super.setup();
+        
admin1.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        createTransactionCoordinatorAssign(16, pulsar1);
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    /**
+     * enable transaction coordinator for the cluster1
+     */
+    @Override
+    public void setConfig1DefaultValue(){
+        super.setConfig1DefaultValue();
+        config1.setTransactionCoordinatorEnabled(true);
+    }
+
+    protected void createTransactionCoordinatorAssign(int numPartitionsOfTC, 
PulsarService pulsarService) throws MetadataStoreException {
+        pulsarService.getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(numPartitionsOfTC));
+    }
+
+    /**
+     * Test replicated subscription with transaction.
+     * @throws Exception
+     */
+    @Test
+    public void 
testReplicatedSubscribeAndSwitchToStandbyClusterWithTransaction() throws 
Exception {
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ namespace + "/tp_");
+        final String subscriptionName = "s1";
+        final boolean isReplicatedSubscription = true;
+        final int messagesCount = 20;
+        final LinkedHashSet<String> sentMessages = new LinkedHashSet<>();
+        final Set<String> receivedMessages = Collections.synchronizedSet(new 
LinkedHashSet<>());
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+        admin1.topics().createNonPartitionedTopic(topicName);
+        admin1.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest, isReplicatedSubscription);
+        final PersistentTopic topic1 =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+        // Send messages
+        // Wait for the topic created on the cluster2.
+        // Wait for the snapshot created.
+        final PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString()).enableTransaction(true).build();
+        Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+        Consumer<String> consumer1 = 
client1.newConsumer(Schema.STRING).topic(topicName)
+                
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
+        Transaction txn1 = client1.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+        for (int i = 0; i < messagesCount / 2; i++) {
+            String msg = i + "";
+            producer1.newMessage(txn1).value(msg).send();
+            sentMessages.add(msg);
+        }
+        txn1.commit().get();
+        Awaitility.await().untilAsserted(() -> {
+            ConcurrentOpenHashMap<String, ? extends Replicator> replicators = 
topic1.getReplicators();
+            assertTrue(replicators != null && replicators.size() == 1, 
"Replicator should started");
+            assertTrue(replicators.values().iterator().next().isConnected(), 
"Replicator should be connected");
+            
assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(),
+                    "One snapshot should be finished");
+        });
+        final PersistentTopic topic2 =
+                (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+        Awaitility.await().untilAsserted(() -> {
+            
assertTrue(topic2.getReplicatedSubscriptionController().isPresent(),
+                    "Replicated subscription controller should created");
+        });
+        Transaction txn2 = client1.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+        for (int i = messagesCount / 2; i < messagesCount; i++) {
+            String msg = i + "";
+            producer1.newMessage(txn2).value(msg).send();
+            sentMessages.add(msg);
+        }
+        txn2.commit().get();
+
+        // Consume half messages and wait the subscription created on the 
cluster2.
+        for (int i = 0; i < messagesCount / 2; i++){
+            Message<String> message = consumer1.receive(2, TimeUnit.SECONDS);
+            if (message == null) {
+                fail("Should not receive null.");
+            }
+            receivedMessages.add(message.getValue());
+            consumer1.acknowledge(message);
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(topic2.getSubscriptions().get(subscriptionName), 
"Subscription should created");
+        });
+
+        // Switch client to cluster2.
+        // Since the cluster1 was not crash, all messages will be replicated 
to the cluster2.
+        consumer1.close();
+        final PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString()).build();
+        final Consumer consumer2 = 
client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName)
+                
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
+
+        // Verify all messages will be consumed.
+        Awaitility.await().untilAsserted(() -> {
+            while (true) {
+                Message message = consumer2.receive(2, TimeUnit.SECONDS);
+                if (message != null) {
+                    receivedMessages.add(message.getValue().toString());
+                    consumer2.acknowledge(message);
+                } else {
+                    break;
+                }
+            }
+            assertEquals(receivedMessages.size(), sentMessages.size());
+        });
+
+        consumer2.close();
+        producer1.close();
+        client1.close();
+        client2.close();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 32ffd293893..b375ab7d954 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -19,11 +19,14 @@
 package org.apache.pulsar.broker.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertTrue;
+
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -38,7 +41,9 @@ import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -180,6 +185,37 @@ public class TransactionProduceTest extends 
TransactionTestBase {
         log.info("produce and {} test finished.", endAction ? "commit" : 
"abort");
     }
 
+    @Test
+    public void 
testUpdateLastMaxReadPositionMovedForwardTimestampForTransactionalPublish() 
throws Exception {
+        final String topic = NAMESPACE1 + 
"/testUpdateLastMaxReadPositionMovedForwardTimestampForTransactionalPublish";
+        PulsarClient pulsarClient = this.pulsarClient;
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+        PersistentTopic persistentTopic = getTopic(topic);
+        long lastMaxReadPositionMovedForwardTimestamp = 
persistentTopic.getLastMaxReadPositionMovedForwardTimestamp();
+
+        // transactional publish will not update 
lastMaxReadPositionMovedForwardTimestamp
+        producer.newMessage(txn).value("hello world".getBytes()).send();
+        
assertTrue(persistentTopic.getLastMaxReadPositionMovedForwardTimestamp() == 
lastMaxReadPositionMovedForwardTimestamp);
+
+        // commit transaction will update 
lastMaxReadPositionMovedForwardTimestamp
+        txn.commit().get();
+        
assertTrue(persistentTopic.getLastMaxReadPositionMovedForwardTimestamp() > 
lastMaxReadPositionMovedForwardTimestamp);
+    }
+
+    private PersistentTopic getTopic(String topic) throws ExecutionException, 
InterruptedException {
+        Optional<Topic> optionalTopic = 
getPulsarServiceList().get(0).getBrokerService()
+                .getTopic(topic, true).get();
+        return (PersistentTopic) optionalTopic.get();
+    }
+
     private void checkMessageId(List<CompletableFuture<MessageId>> futureList, 
boolean isFinished) {
         futureList.forEach(messageIdFuture -> {
             try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 86def029186..fa32a859d13 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1086,7 +1086,7 @@ public class TransactionTest extends TransactionTestBase {
         });
         Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);
 
-        buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
+        buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1), 
false);
         Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);
 
     }


Reply via email to