This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9fd1b61fc45 [fix][broker] fix replicated subscriptions for
transactional messages (#22452)
9fd1b61fc45 is described below
commit 9fd1b61fc45d06348af0241f002966087f1822a0
Author: Wenzhi Feng <[email protected]>
AuthorDate: Mon May 13 20:04:55 2024 +0800
[fix][broker] fix replicated subscriptions for transactional messages
(#22452)
---
.../broker/service/persistent/PersistentTopic.java | 21 ++-
.../ReplicatedSubscriptionsController.java | 4 +-
.../transaction/buffer/TransactionBuffer.java | 3 +-
.../buffer/impl/InMemTransactionBuffer.java | 13 +-
.../buffer/impl/TopicTransactionBuffer.java | 70 ++++++--
.../buffer/impl/TransactionBufferDisable.java | 13 +-
.../pulsar/broker/service/PersistentTopicTest.java | 4 +-
.../broker/service/ReplicatorSubscriptionTest.java | 25 +++
.../TransactionalReplicateSubscriptionTest.java | 182 +++++++++++++++++++++
.../broker/transaction/TransactionProduceTest.java | 36 ++++
.../pulsar/broker/transaction/TransactionTest.java | 2 +-
11 files changed, 342 insertions(+), 31 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 28bc27f7961..69c7f404fdd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -134,6 +134,7 @@ import
org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -272,10 +273,13 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Getter
protected final TransactionBuffer transactionBuffer;
+ @Getter
+ private final TopicTransactionBuffer.MaxReadPositionCallBack
maxReadPositionCallBack =
+ (oldPosition, newPosition) ->
updateMaxReadPositionMovedForwardTimestamp();
- // Record the last time a data message (ie: not an internal Pulsar marker)
is published on the topic
+ // Record the last time max read position is moved forward, unless it's a
marker message.
@Getter
- private volatile long lastDataMessagePublishedTimestamp = 0;
+ private volatile long lastMaxReadPositionMovedForwardTimestamp = 0;
@Getter
private final ExecutorService orderedExecutor;
@@ -410,7 +414,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
} else {
this.transactionBuffer = new TransactionBufferDisable(this);
}
- transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl)
ledger.getLastConfirmedEntry());
+ transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl)
ledger.getLastConfirmedEntry(), true);
if (ledger instanceof ShadowManagedLedgerImpl) {
shadowSourceTopic =
TopicName.get(ledger.getConfig().getShadowSource());
} else {
@@ -719,6 +723,10 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
}
+ private void updateMaxReadPositionMovedForwardTimestamp() {
+ lastMaxReadPositionMovedForwardTimestamp = Clock.systemUTC().millis();
+ }
+
@Override
public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
PublishContext publishContext = (PublishContext) ctx;
@@ -727,12 +735,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
// Message has been successfully persisted
messageDeduplication.recordMessagePersisted(publishContext, position);
- if (!publishContext.isMarkerMessage()) {
- lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
- }
-
// in order to sync the max position when cursor read entries
- transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl)
ledger.getLastConfirmedEntry());
+ transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl)
ledger.getLastConfirmedEntry(),
+ publishContext.isMarkerMessage());
publishContext.setMetadataFromEntryData(entryData);
publishContext.completed(null, position.getLedgerId(),
position.getEntryId());
decrementPendingWriteOpsAndCheck();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
index e011ed8d660..3a796b3e96d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
@@ -206,8 +206,8 @@ public class ReplicatedSubscriptionsController implements
AutoCloseable, Topic.P
private void startNewSnapshot() {
cleanupTimedOutSnapshots();
- if (topic.getLastDataMessagePublishedTimestamp() <
lastCompletedSnapshotStartTime
- || topic.getLastDataMessagePublishedTimestamp() == 0) {
+ if (topic.getLastMaxReadPositionMovedForwardTimestamp() <
lastCompletedSnapshotStartTime
+ || topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
// There was no message written since the last snapshot, we can
skip creating a new snapshot
if (log.isDebugEnabled()) {
log.debug("[{}] There is no new data in topic. Skipping
snapshot creation.", topic.getName());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index 3fe989acc92..092638abf5b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -148,8 +148,9 @@ public interface TransactionBuffer {
/**
* Sync max read position for normal publish.
* @param position {@link PositionImpl} the position to sync.
+ * @param isMarkerMessage whether the message is marker message.
*/
- void syncMaxReadPositionForNormalPublish(PositionImpl position);
+ void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean
isMarkerMessage);
/**
* Get the can read max position.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 978536c5f4e..bab7b64c608 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
@@ -213,11 +214,17 @@ class InMemTransactionBuffer implements TransactionBuffer
{
final ConcurrentMap<TxnID, TxnBuffer> buffers;
final Map<Long, Set<TxnID>> txnIndex;
private final Topic topic;
+ private final TopicTransactionBuffer.MaxReadPositionCallBack
maxReadPositionCallBack;
public InMemTransactionBuffer(Topic topic) {
this.buffers = new ConcurrentHashMap<>();
this.txnIndex = new HashMap<>();
this.topic = topic;
+ if (topic instanceof PersistentTopic) {
+ this.maxReadPositionCallBack = ((PersistentTopic)
topic).getMaxReadPositionCallBack();
+ } else {
+ this.maxReadPositionCallBack = null;
+ }
}
@Override
@@ -369,8 +376,10 @@ class InMemTransactionBuffer implements TransactionBuffer {
}
@Override
- public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
- //no-op
+ public void syncMaxReadPositionForNormalPublish(PositionImpl position,
boolean isMarkerMessage) {
+ if (!isMarkerMessage && maxReadPositionCallBack != null) {
+ maxReadPositionCallBack.maxReadPositionMovedForward(null,
position);
+ }
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 81c9ecfc728..dfb73815e08 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -103,6 +103,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final AbortedTxnProcessor snapshotAbortedTxnProcessor;
private final AbortedTxnProcessor.SnapshotType snapshotType;
+ private final MaxReadPositionCallBack maxReadPositionCallBack;
public TopicTransactionBuffer(PersistentTopic topic) {
super(State.None);
@@ -120,6 +121,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
snapshotAbortedTxnProcessor = new
SingleSnapshotAbortedTxnProcessorImpl(topic);
snapshotType = AbortedTxnProcessor.SnapshotType.Single;
}
+ this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
this.recover();
}
@@ -175,7 +177,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
if (Markers.isTxnAbortMarker(msgMetadata))
{
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
}
- updateMaxReadPosition(txnID);
+ removeTxnAndUpdateMaxReadPosition(txnID);
} else {
handleTransactionMessage(txnID, position);
}
@@ -290,7 +292,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
ongoingTxns.put(txnId, (PositionImpl) position);
PositionImpl firstPosition =
ongoingTxns.get(ongoingTxns.firstKey());
// max read position is less than first ongoing transaction
message position
- maxReadPosition = ((ManagedLedgerImpl)
topic.getManagedLedger()).getPreviousPosition(firstPosition);
+ updateMaxReadPosition(((ManagedLedgerImpl)
topic.getManagedLedger()).getPreviousPosition(firstPosition),
+ false);
}
}
@@ -314,7 +317,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public void addComplete(Position position, ByteBuf
entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
- updateMaxReadPosition(txnID);
+ removeTxnAndUpdateMaxReadPosition(txnID);
handleLowWaterMark(txnID, lowWaterMark);
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
takeSnapshotByChangeTimes();
@@ -361,7 +364,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
public void addComplete(Position position, ByteBuf
entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, (PositionImpl)
position);
- updateMaxReadPosition(txnID);
+ removeTxnAndUpdateMaxReadPosition(txnID);
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
takeSnapshotByChangeTimes();
txnAbortedCounter.increment();
@@ -444,17 +447,39 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
}
- void updateMaxReadPosition(TxnID txnID) {
- PositionImpl preMaxReadPosition = this.maxReadPosition;
+ /**
+ * remove the specified transaction from ongoing transaction list and
update the max read position.
+ * @param txnID
+ */
+ void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
ongoingTxns.remove(txnID);
if (!ongoingTxns.isEmpty()) {
PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
- maxReadPosition = ((ManagedLedgerImpl)
topic.getManagedLedger()).getPreviousPosition(position);
+ updateMaxReadPosition(((ManagedLedgerImpl)
topic.getManagedLedger()).getPreviousPosition(position), false);
} else {
- maxReadPosition = (PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry();
+ updateMaxReadPosition((PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry(), false);
}
- if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
- this.changeMaxReadPositionCount.getAndIncrement();
+ }
+
+ /**
+ * update the max read position. if the new position is greater than the
current max read position,
+ * we will trigger the callback, unless the disableCallback is true.
+ * Currently, we only use the callback to update the
lastMaxReadPositionMovedForwardTimestamp.
+ * For non-transactional production, some marker messages will be sent to
the topic, in which case we don't need
+ * to trigger the callback.
+ * @param newPosition new max read position to update.
+ * @param disableCallback whether disable the callback.
+ */
+ void updateMaxReadPosition(PositionImpl newPosition, boolean
disableCallback) {
+ PositionImpl preMaxReadPosition = this.maxReadPosition;
+ this.maxReadPosition = newPosition;
+ if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+ if (!checkIfNoSnapshot()) {
+ this.changeMaxReadPositionCount.getAndIncrement();
+ }
+ if (!disableCallback) {
+
maxReadPositionCallBack.maxReadPositionMovedForward(preMaxReadPosition,
this.maxReadPosition);
+ }
}
}
@@ -479,17 +504,22 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
}
+ /**
+ * Sync max read position for normal publish.
+ * @param position {@link PositionImpl} the position to sync.
+ * @param isMarkerMessage whether the message is marker message, in such
case, we
+ * don't need to trigger the callback to update
lastMaxReadPositionMovedForwardTimestamp.
+ */
@Override
- public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+ public void syncMaxReadPositionForNormalPublish(PositionImpl position,
boolean isMarkerMessage) {
// when ongoing transaction is empty, proved that lastAddConfirm is
can read max position, because callback
// thread is the same tread, in this time the lastAddConfirm don't
content transaction message.
synchronized (TopicTransactionBuffer.this) {
if (checkIfNoSnapshot()) {
- this.maxReadPosition = position;
+ updateMaxReadPosition(position, isMarkerMessage);
} else if (checkIfReady()) {
if (ongoingTxns.isEmpty()) {
- maxReadPosition = position;
- changeMaxReadPositionCount.incrementAndGet();
+ updateMaxReadPosition(position, isMarkerMessage);
}
}
}
@@ -674,6 +704,18 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
}
+ /**
+ * A functional interface to handle the max read position move forward.
+ */
+ public interface MaxReadPositionCallBack {
+ /**
+ * callback method when max read position move forward.
+ * @param oldPosition the old max read position.
+ * @param newPosition the new max read position.
+ */
+ void maxReadPositionMovedForward(PositionImpl oldPosition,
PositionImpl newPosition);
+ }
+
static class FillEntryQueueCallback implements
AsyncCallbacks.ReadEntriesCallback {
private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 9de0888ae5b..ebd61dbaa82 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
@@ -42,8 +43,14 @@ import org.apache.pulsar.common.util.FutureUtil;
public class TransactionBufferDisable implements TransactionBuffer {
private final Topic topic;
+ private final TopicTransactionBuffer.MaxReadPositionCallBack
maxReadPositionCallBack;
public TransactionBufferDisable(Topic topic) {
this.topic = topic;
+ if (topic instanceof PersistentTopic) {
+ this.maxReadPositionCallBack = ((PersistentTopic)
topic).getMaxReadPositionCallBack();
+ } else {
+ this.maxReadPositionCallBack = null;
+ }
}
@Override
@@ -91,8 +98,10 @@ public class TransactionBufferDisable implements
TransactionBuffer {
}
@Override
- public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
- //no-op
+ public void syncMaxReadPositionForNormalPublish(PositionImpl position,
boolean isMarkerMessage) {
+ if (!isMarkerMessage && maxReadPositionCallBack != null) {
+ maxReadPositionCallBack.maxReadPositionMovedForward(null,
position);
+ }
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index de9d0272fc0..1118b71456e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -298,6 +298,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class),
any(AddEntryCallback.class), any());
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ long lastMaxReadPositionMovedForwardTimestamp =
topic.getLastMaxReadPositionMovedForwardTimestamp();
+
/*
* MessageMetadata.Builder messageMetadata =
MessageMetadata.newBuilder();
* messageMetadata.setPublishTime(System.currentTimeMillis());
messageMetadata.setProducerName("producer-name");
@@ -322,10 +324,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
assertEquals(entryData.array(), payload.array());
}
};
-
topic.publishMessage(payload, publishContext);
assertTrue(latch.await(1, TimeUnit.SECONDS));
+ assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() >
lastMaxReadPositionMovedForwardTimestamp);
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index 25b09f96549..647b7b28281 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -26,6 +26,8 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
+
+import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -40,8 +42,10 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
+import
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -728,6 +732,21 @@ public class ReplicatorSubscriptionTest extends
ReplicatorTestBase {
consumer4.close();
}
+ /**
+ * before sending message, we should wait for transaction buffer recover
complete,
+ * or the MaxReadPosition will not move forward when the message is sent,
and the
+ * MaxReadPositionMovedForwardTimestamp will not be updated, then the
replication will not be triggered.
+ * @param topicName
+ * @throws Exception
+ */
+ private void waitTBRecoverComplete(PulsarService pulsarService, String
topicName) throws Exception {
+ TopicTransactionBufferState buffer = (TopicTransactionBufferState)
((PersistentTopic) pulsarService.getBrokerService()
+ .getTopic(topicName,
false).get().get()).getTransactionBuffer();
+ Field stateField =
TopicTransactionBufferState.class.getDeclaredField("state");
+ stateField.setAccessible(true);
+ Awaitility.await().until(() ->
!stateField.get(buffer).toString().equals("Initializing"));
+ }
+
/**
* Tests replicated subscriptions when replicator producer is closed
*/
@@ -755,6 +774,9 @@ public class ReplicatorSubscriptionTest extends
ReplicatorTestBase {
.subscribe();
// send one message to trigger replication
+ if (config1.isTransactionCoordinatorEnabled()) {
+ waitTBRecoverComplete(pulsar1, topicName);
+ }
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
@@ -917,6 +939,9 @@ public class ReplicatorSubscriptionTest extends
ReplicatorTestBase {
.statsInterval(0, TimeUnit.SECONDS).build();
Producer<String> producer =
client.newProducer(Schema.STRING).topic(topicName).create();
+ if (config1.isTransactionCoordinatorEnabled()) {
+ waitTBRecoverComplete(pulsar1, topicName);
+ }
producer.newMessage().key("K1").value("V1").send();
producer.newMessage().key("K1").value("V2").send();
producer.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java
new file mode 100644
index 00000000000..2d348f82597
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class TransactionalReplicateSubscriptionTest extends ReplicatorTestBase
{
+ @Override
+ @BeforeClass(timeOut = 300000)
+ public void setup() throws Exception {
+ super.setup();
+
admin1.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+ createTransactionCoordinatorAssign(16, pulsar1);
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ /**
+ * enable transaction coordinator for the cluster1
+ */
+ @Override
+ public void setConfig1DefaultValue(){
+ super.setConfig1DefaultValue();
+ config1.setTransactionCoordinatorEnabled(true);
+ }
+
+ protected void createTransactionCoordinatorAssign(int numPartitionsOfTC,
PulsarService pulsarService) throws MetadataStoreException {
+ pulsarService.getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources()
+
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+ new PartitionedTopicMetadata(numPartitionsOfTC));
+ }
+
+ /**
+ * Test replicated subscription with transaction.
+ * @throws Exception
+ */
+ @Test
+ public void
testReplicatedSubscribeAndSwitchToStandbyClusterWithTransaction() throws
Exception {
+ final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_");
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/tp_");
+ final String subscriptionName = "s1";
+ final boolean isReplicatedSubscription = true;
+ final int messagesCount = 20;
+ final LinkedHashSet<String> sentMessages = new LinkedHashSet<>();
+ final Set<String> receivedMessages = Collections.synchronizedSet(new
LinkedHashSet<>());
+ admin1.namespaces().createNamespace(namespace);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2"));
+ admin1.topics().createNonPartitionedTopic(topicName);
+ admin1.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest, isReplicatedSubscription);
+ final PersistentTopic topic1 =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+ // Send messages
+ // Wait for the topic created on the cluster2.
+ // Wait for the snapshot created.
+ final PulsarClient client1 =
PulsarClient.builder().serviceUrl(url1.toString()).enableTransaction(true).build();
+ Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create();
+ Consumer<String> consumer1 =
client1.newConsumer(Schema.STRING).topic(topicName)
+
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
+ Transaction txn1 = client1.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ for (int i = 0; i < messagesCount / 2; i++) {
+ String msg = i + "";
+ producer1.newMessage(txn1).value(msg).send();
+ sentMessages.add(msg);
+ }
+ txn1.commit().get();
+ Awaitility.await().untilAsserted(() -> {
+ ConcurrentOpenHashMap<String, ? extends Replicator> replicators =
topic1.getReplicators();
+ assertTrue(replicators != null && replicators.size() == 1,
"Replicator should started");
+ assertTrue(replicators.values().iterator().next().isConnected(),
"Replicator should be connected");
+
assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(),
+ "One snapshot should be finished");
+ });
+ final PersistentTopic topic2 =
+ (PersistentTopic)
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+ Awaitility.await().untilAsserted(() -> {
+
assertTrue(topic2.getReplicatedSubscriptionController().isPresent(),
+ "Replicated subscription controller should created");
+ });
+ Transaction txn2 = client1.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ for (int i = messagesCount / 2; i < messagesCount; i++) {
+ String msg = i + "";
+ producer1.newMessage(txn2).value(msg).send();
+ sentMessages.add(msg);
+ }
+ txn2.commit().get();
+
+ // Consume half messages and wait the subscription created on the
cluster2.
+ for (int i = 0; i < messagesCount / 2; i++){
+ Message<String> message = consumer1.receive(2, TimeUnit.SECONDS);
+ if (message == null) {
+ fail("Should not receive null.");
+ }
+ receivedMessages.add(message.getValue());
+ consumer1.acknowledge(message);
+ }
+ Awaitility.await().untilAsserted(() -> {
+ assertNotNull(topic2.getSubscriptions().get(subscriptionName),
"Subscription should created");
+ });
+
+ // Switch client to cluster2.
+ // Since the cluster1 was not crash, all messages will be replicated
to the cluster2.
+ consumer1.close();
+ final PulsarClient client2 =
PulsarClient.builder().serviceUrl(url2.toString()).build();
+ final Consumer consumer2 =
client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName)
+
.subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe();
+
+ // Verify all messages will be consumed.
+ Awaitility.await().untilAsserted(() -> {
+ while (true) {
+ Message message = consumer2.receive(2, TimeUnit.SECONDS);
+ if (message != null) {
+ receivedMessages.add(message.getValue().toString());
+ consumer2.acknowledge(message);
+ } else {
+ break;
+ }
+ }
+ assertEquals(receivedMessages.size(), sentMessages.size());
+ });
+
+ consumer2.close();
+ producer1.close();
+ client1.close();
+ client2.close();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 32ffd293893..b375ab7d954 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -19,11 +19,14 @@
package org.apache.pulsar.broker.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertTrue;
+
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -38,7 +41,9 @@ import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -180,6 +185,37 @@ public class TransactionProduceTest extends
TransactionTestBase {
log.info("produce and {} test finished.", endAction ? "commit" :
"abort");
}
+ @Test
+ public void
testUpdateLastMaxReadPositionMovedForwardTimestampForTransactionalPublish()
throws Exception {
+ final String topic = NAMESPACE1 +
"/testUpdateLastMaxReadPositionMovedForwardTimestampForTransactionalPublish";
+ PulsarClient pulsarClient = this.pulsarClient;
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient
+ .newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+ PersistentTopic persistentTopic = getTopic(topic);
+ long lastMaxReadPositionMovedForwardTimestamp =
persistentTopic.getLastMaxReadPositionMovedForwardTimestamp();
+
+ // transactional publish will not update
lastMaxReadPositionMovedForwardTimestamp
+ producer.newMessage(txn).value("hello world".getBytes()).send();
+
assertTrue(persistentTopic.getLastMaxReadPositionMovedForwardTimestamp() ==
lastMaxReadPositionMovedForwardTimestamp);
+
+ // commit transaction will update
lastMaxReadPositionMovedForwardTimestamp
+ txn.commit().get();
+
assertTrue(persistentTopic.getLastMaxReadPositionMovedForwardTimestamp() >
lastMaxReadPositionMovedForwardTimestamp);
+ }
+
+ private PersistentTopic getTopic(String topic) throws ExecutionException,
InterruptedException {
+ Optional<Topic> optionalTopic =
getPulsarServiceList().get(0).getBrokerService()
+ .getTopic(topic, true).get();
+ return (PersistentTopic) optionalTopic.get();
+ }
+
private void checkMessageId(List<CompletableFuture<MessageId>> futureList,
boolean isFinished) {
futureList.forEach(messageIdFuture -> {
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 5e806bb9cee..55a3e098965 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1119,7 +1119,7 @@ public class TransactionTest extends TransactionTestBase {
});
Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);
- buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
+ buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1),
false);
Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);
}