This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e99bf9a4a77a90f14956909a849f927fb1052a81 Author: Lari Hotari <[email protected]> AuthorDate: Tue May 14 07:27:41 2024 +0300 [fix][broker] fix replicated subscriptions for transactional messages (#22452) (cherry picked from commit 9fd1b61fc45d06348af0241f002966087f1822a0) # Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java --- .../broker/service/persistent/PersistentTopic.java | 26 +-- .../ReplicatedSubscriptionsController.java | 4 +- .../transaction/buffer/TransactionBuffer.java | 3 +- .../buffer/impl/InMemTransactionBuffer.java | 13 +- .../buffer/impl/TopicTransactionBuffer.java | 70 ++++++-- .../buffer/impl/TransactionBufferDisable.java | 13 +- .../pulsar/broker/service/PersistentTopicTest.java | 4 +- .../broker/service/ReplicatorSubscriptionTest.java | 25 +++ .../TransactionalReplicateSubscriptionTest.java | 182 +++++++++++++++++++++ .../broker/transaction/TransactionProduceTest.java | 36 ++++ .../pulsar/broker/transaction/TransactionTest.java | 2 +- 11 files changed, 343 insertions(+), 35 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 472387a0a9b..5eb0600a9fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -128,6 +128,7 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.broker.stats.ReplicationMetrics; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -266,9 +267,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Getter protected final TransactionBuffer transactionBuffer; + @Getter + private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack = + (oldPosition, newPosition) -> updateMaxReadPositionMovedForwardTimestamp(); - // Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic - private volatile long lastDataMessagePublishedTimestamp = 0; + // Record the last time max read position is moved forward, unless it's a marker message. + @Getter + private volatile long lastMaxReadPositionMovedForwardTimestamp = 0; @Getter private final ExecutorService orderedExecutor; @@ -380,7 +385,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } else { this.transactionBuffer = new TransactionBufferDisable(this); } - transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); + transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), true); if (ledger instanceof ShadowManagedLedgerImpl) { shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource()); } else { @@ -681,6 +686,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } } + private void updateMaxReadPositionMovedForwardTimestamp() { + lastMaxReadPositionMovedForwardTimestamp = Clock.systemUTC().millis(); + } + @Override public void addComplete(Position pos, ByteBuf entryData, Object ctx) { PublishContext publishContext = (PublishContext) ctx; @@ -689,12 +698,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal // Message has been successfully persisted messageDeduplication.recordMessagePersisted(publishContext, position); - if (!publishContext.isMarkerMessage()) { - lastDataMessagePublishedTimestamp = Clock.systemUTC().millis(); - } - // in order to sync the max position when cursor read entries - transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); + transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry(), + publishContext.isMarkerMessage()); publishContext.setMetadataFromEntryData(entryData); publishContext.completed(null, position.getLedgerId(), position.getEntryId()); decrementPendingWriteOpsAndCheck(); @@ -4101,10 +4107,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return transactionBuffer.clearSnapshot().thenCompose(__ -> transactionBuffer.closeAsync()); } - public long getLastDataMessagePublishedTimestamp() { - return lastDataMessagePublishedTimestamp; - } - public Optional<TopicName> getShadowSourceTopic() { return Optional.ofNullable(shadowSourceTopic); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index e011ed8d660..3a796b3e96d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -206,8 +206,8 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P private void startNewSnapshot() { cleanupTimedOutSnapshots(); - if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime - || topic.getLastDataMessagePublishedTimestamp() == 0) { + if (topic.getLastMaxReadPositionMovedForwardTimestamp() < lastCompletedSnapshotStartTime + || topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) { // There was no message written since the last snapshot, we can skip creating a new snapshot if (log.isDebugEnabled()) { log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java index 7eb5d6f789c..79eb263473f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java @@ -149,8 +149,9 @@ public interface TransactionBuffer { /** * Sync max read position for normal publish. * @param position {@link PositionImpl} the position to sync. + * @param isMarkerMessage whether the message is marker message. */ - void syncMaxReadPositionForNormalPublish(PositionImpl position); + void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage); /** * Get the can read max position. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index 978536c5f4e..bab7b64c608 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader; @@ -213,11 +214,17 @@ class InMemTransactionBuffer implements TransactionBuffer { final ConcurrentMap<TxnID, TxnBuffer> buffers; final Map<Long, Set<TxnID>> txnIndex; private final Topic topic; + private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack; public InMemTransactionBuffer(Topic topic) { this.buffers = new ConcurrentHashMap<>(); this.txnIndex = new HashMap<>(); this.topic = topic; + if (topic instanceof PersistentTopic) { + this.maxReadPositionCallBack = ((PersistentTopic) topic).getMaxReadPositionCallBack(); + } else { + this.maxReadPositionCallBack = null; + } } @Override @@ -369,8 +376,10 @@ class InMemTransactionBuffer implements TransactionBuffer { } @Override - public void syncMaxReadPositionForNormalPublish(PositionImpl position) { - //no-op + public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) { + if (!isMarkerMessage && maxReadPositionCallBack != null) { + maxReadPositionCallBack.maxReadPositionMovedForward(null, position); + } } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 81c9ecfc728..dfb73815e08 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -103,6 +103,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final AbortedTxnProcessor snapshotAbortedTxnProcessor; private final AbortedTxnProcessor.SnapshotType snapshotType; + private final MaxReadPositionCallBack maxReadPositionCallBack; public TopicTransactionBuffer(PersistentTopic topic) { super(State.None); @@ -120,6 +121,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen snapshotAbortedTxnProcessor = new SingleSnapshotAbortedTxnProcessorImpl(topic); snapshotType = AbortedTxnProcessor.SnapshotType.Single; } + this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack(); this.recover(); } @@ -175,7 +177,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen if (Markers.isTxnAbortMarker(msgMetadata)) { snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position); } - updateMaxReadPosition(txnID); + removeTxnAndUpdateMaxReadPosition(txnID); } else { handleTransactionMessage(txnID, position); } @@ -290,7 +292,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen ongoingTxns.put(txnId, (PositionImpl) position); PositionImpl firstPosition = ongoingTxns.get(ongoingTxns.firstKey()); // max read position is less than first ongoing transaction message position - maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(firstPosition); + updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(firstPosition), + false); } } @@ -314,7 +317,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen @Override public void addComplete(Position position, ByteBuf entryData, Object ctx) { synchronized (TopicTransactionBuffer.this) { - updateMaxReadPosition(txnID); + removeTxnAndUpdateMaxReadPosition(txnID); handleLowWaterMark(txnID, lowWaterMark); snapshotAbortedTxnProcessor.trimExpiredAbortedTxns(); takeSnapshotByChangeTimes(); @@ -361,7 +364,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen public void addComplete(Position position, ByteBuf entryData, Object ctx) { synchronized (TopicTransactionBuffer.this) { snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, (PositionImpl) position); - updateMaxReadPosition(txnID); + removeTxnAndUpdateMaxReadPosition(txnID); snapshotAbortedTxnProcessor.trimExpiredAbortedTxns(); takeSnapshotByChangeTimes(); txnAbortedCounter.increment(); @@ -444,17 +447,39 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); } - void updateMaxReadPosition(TxnID txnID) { - PositionImpl preMaxReadPosition = this.maxReadPosition; + /** + * remove the specified transaction from ongoing transaction list and update the max read position. + * @param txnID + */ + void removeTxnAndUpdateMaxReadPosition(TxnID txnID) { ongoingTxns.remove(txnID); if (!ongoingTxns.isEmpty()) { PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey()); - maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position); + updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false); } else { - maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); + updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false); } - if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) { - this.changeMaxReadPositionCount.getAndIncrement(); + } + + /** + * update the max read position. if the new position is greater than the current max read position, + * we will trigger the callback, unless the disableCallback is true. + * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp. + * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need + * to trigger the callback. + * @param newPosition new max read position to update. + * @param disableCallback whether disable the callback. + */ + void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) { + PositionImpl preMaxReadPosition = this.maxReadPosition; + this.maxReadPosition = newPosition; + if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) { + if (!checkIfNoSnapshot()) { + this.changeMaxReadPositionCount.getAndIncrement(); + } + if (!disableCallback) { + maxReadPositionCallBack.maxReadPositionMovedForward(preMaxReadPosition, this.maxReadPosition); + } } } @@ -479,17 +504,22 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID); } + /** + * Sync max read position for normal publish. + * @param position {@link PositionImpl} the position to sync. + * @param isMarkerMessage whether the message is marker message, in such case, we + * don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp. + */ @Override - public void syncMaxReadPositionForNormalPublish(PositionImpl position) { + public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) { // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback // thread is the same tread, in this time the lastAddConfirm don't content transaction message. synchronized (TopicTransactionBuffer.this) { if (checkIfNoSnapshot()) { - this.maxReadPosition = position; + updateMaxReadPosition(position, isMarkerMessage); } else if (checkIfReady()) { if (ongoingTxns.isEmpty()) { - maxReadPosition = position; - changeMaxReadPositionCount.incrementAndGet(); + updateMaxReadPosition(position, isMarkerMessage); } } } @@ -674,6 +704,18 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen } } + /** + * A functional interface to handle the max read position move forward. + */ + public interface MaxReadPositionCallBack { + /** + * callback method when max read position move forward. + * @param oldPosition the old max read position. + * @param newPosition the new max read position. + */ + void maxReadPositionMovedForward(PositionImpl oldPosition, PositionImpl newPosition); + } + static class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback { private final AtomicLong outstandingReadsRequests = new AtomicLong(0); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index 9de0888ae5b..ebd61dbaa82 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader; @@ -42,8 +43,14 @@ import org.apache.pulsar.common.util.FutureUtil; public class TransactionBufferDisable implements TransactionBuffer { private final Topic topic; + private final TopicTransactionBuffer.MaxReadPositionCallBack maxReadPositionCallBack; public TransactionBufferDisable(Topic topic) { this.topic = topic; + if (topic instanceof PersistentTopic) { + this.maxReadPositionCallBack = ((PersistentTopic) topic).getMaxReadPositionCallBack(); + } else { + this.maxReadPositionCallBack = null; + } } @Override @@ -91,8 +98,10 @@ public class TransactionBufferDisable implements TransactionBuffer { } @Override - public void syncMaxReadPositionForNormalPublish(PositionImpl position) { - //no-op + public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) { + if (!isMarkerMessage && maxReadPositionCallBack != null) { + maxReadPositionCallBack.maxReadPositionMovedForward(null, position); + } } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 412dfb381c8..bf57586e517 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -298,6 +298,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any()); PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + long lastMaxReadPositionMovedForwardTimestamp = topic.getLastMaxReadPositionMovedForwardTimestamp(); + /* * MessageMetadata.Builder messageMetadata = MessageMetadata.newBuilder(); * messageMetadata.setPublishTime(System.currentTimeMillis()); messageMetadata.setProducerName("producer-name"); @@ -322,10 +324,10 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { assertEquals(entryData.array(), payload.array()); } }; - topic.publishMessage(payload, publishContext); assertTrue(latch.await(1, TimeUnit.SECONDS)); + assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index bfafdf89ed6..870054ecaf6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -26,6 +26,8 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; + +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -40,8 +42,10 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -728,6 +732,21 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { consumer4.close(); } + /** + * before sending message, we should wait for transaction buffer recover complete, + * or the MaxReadPosition will not move forward when the message is sent, and the + * MaxReadPositionMovedForwardTimestamp will not be updated, then the replication will not be triggered. + * @param topicName + * @throws Exception + */ + private void waitTBRecoverComplete(PulsarService pulsarService, String topicName) throws Exception { + TopicTransactionBufferState buffer = (TopicTransactionBufferState) ((PersistentTopic) pulsarService.getBrokerService() + .getTopic(topicName, false).get().get()).getTransactionBuffer(); + Field stateField = TopicTransactionBufferState.class.getDeclaredField("state"); + stateField.setAccessible(true); + Awaitility.await().until(() -> !stateField.get(buffer).toString().equals("Initializing")); + } + /** * Tests replicated subscriptions when replicator producer is closed */ @@ -755,6 +774,9 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { .subscribe(); // send one message to trigger replication + if (config1.isTransactionCoordinatorEnabled()) { + waitTBRecoverComplete(pulsar1, topicName); + } @Cleanup Producer<byte[]> producer = client1.newProducer().topic(topicName) .enableBatching(false) @@ -916,6 +938,9 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase { .statsInterval(0, TimeUnit.SECONDS).build(); Producer<String> producer = client.newProducer(Schema.STRING).topic(topicName).create(); + if (config1.isTransactionCoordinatorEnabled()) { + waitTBRecoverComplete(pulsar1, topicName); + } producer.newMessage().key("K1").value("V1").send(); producer.newMessage().key("K1").value("V2").send(); producer.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java new file mode 100644 index 00000000000..2d348f82597 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import com.google.common.collect.Sets; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class TransactionalReplicateSubscriptionTest extends ReplicatorTestBase { + @Override + @BeforeClass(timeOut = 300000) + public void setup() throws Exception { + super.setup(); + admin1.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); + createTransactionCoordinatorAssign(16, pulsar1); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + /** + * enable transaction coordinator for the cluster1 + */ + @Override + public void setConfig1DefaultValue(){ + super.setConfig1DefaultValue(); + config1.setTransactionCoordinatorEnabled(true); + } + + protected void createTransactionCoordinatorAssign(int numPartitionsOfTC, PulsarService pulsarService) throws MetadataStoreException { + pulsarService.getPulsarResources() + .getNamespaceResources() + .getPartitionedTopicResources() + .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, + new PartitionedTopicMetadata(numPartitionsOfTC)); + } + + /** + * Test replicated subscription with transaction. + * @throws Exception + */ + @Test + public void testReplicatedSubscribeAndSwitchToStandbyClusterWithTransaction() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_"); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp_"); + final String subscriptionName = "s1"; + final boolean isReplicatedSubscription = true; + final int messagesCount = 20; + final LinkedHashSet<String> sentMessages = new LinkedHashSet<>(); + final Set<String> receivedMessages = Collections.synchronizedSet(new LinkedHashSet<>()); + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + admin1.topics().createNonPartitionedTopic(topicName); + admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest, isReplicatedSubscription); + final PersistentTopic topic1 = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + + // Send messages + // Wait for the topic created on the cluster2. + // Wait for the snapshot created. + final PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).enableTransaction(true).build(); + Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).enableBatching(false).create(); + Consumer<String> consumer1 = client1.newConsumer(Schema.STRING).topic(topicName) + .subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe(); + Transaction txn1 = client1.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build().get(); + for (int i = 0; i < messagesCount / 2; i++) { + String msg = i + ""; + producer1.newMessage(txn1).value(msg).send(); + sentMessages.add(msg); + } + txn1.commit().get(); + Awaitility.await().untilAsserted(() -> { + ConcurrentOpenHashMap<String, ? extends Replicator> replicators = topic1.getReplicators(); + assertTrue(replicators != null && replicators.size() == 1, "Replicator should started"); + assertTrue(replicators.values().iterator().next().isConnected(), "Replicator should be connected"); + assertTrue(topic1.getReplicatedSubscriptionController().get().getLastCompletedSnapshotId().isPresent(), + "One snapshot should be finished"); + }); + final PersistentTopic topic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + Awaitility.await().untilAsserted(() -> { + assertTrue(topic2.getReplicatedSubscriptionController().isPresent(), + "Replicated subscription controller should created"); + }); + Transaction txn2 = client1.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build().get(); + for (int i = messagesCount / 2; i < messagesCount; i++) { + String msg = i + ""; + producer1.newMessage(txn2).value(msg).send(); + sentMessages.add(msg); + } + txn2.commit().get(); + + // Consume half messages and wait the subscription created on the cluster2. + for (int i = 0; i < messagesCount / 2; i++){ + Message<String> message = consumer1.receive(2, TimeUnit.SECONDS); + if (message == null) { + fail("Should not receive null."); + } + receivedMessages.add(message.getValue()); + consumer1.acknowledge(message); + } + Awaitility.await().untilAsserted(() -> { + assertNotNull(topic2.getSubscriptions().get(subscriptionName), "Subscription should created"); + }); + + // Switch client to cluster2. + // Since the cluster1 was not crash, all messages will be replicated to the cluster2. + consumer1.close(); + final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).build(); + final Consumer consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName) + .subscriptionName(subscriptionName).replicateSubscriptionState(isReplicatedSubscription).subscribe(); + + // Verify all messages will be consumed. + Awaitility.await().untilAsserted(() -> { + while (true) { + Message message = consumer2.receive(2, TimeUnit.SECONDS); + if (message != null) { + receivedMessages.add(message.getValue().toString()); + consumer2.acknowledge(message); + } else { + break; + } + } + assertEquals(receivedMessages.size(), sentMessages.size()); + }); + + consumer2.close(); + producer1.close(); + client1.close(); + client2.close(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java index 32ffd293893..b375ab7d954 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java @@ -19,11 +19,14 @@ package org.apache.pulsar.broker.transaction; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertTrue; + import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -38,7 +41,9 @@ import org.apache.bookkeeper.mledger.ReadOnlyCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -180,6 +185,37 @@ public class TransactionProduceTest extends TransactionTestBase { log.info("produce and {} test finished.", endAction ? "commit" : "abort"); } + @Test + public void testUpdateLastMaxReadPositionMovedForwardTimestampForTransactionalPublish() throws Exception { + final String topic = NAMESPACE1 + "/testUpdateLastMaxReadPositionMovedForwardTimestampForTransactionalPublish"; + PulsarClient pulsarClient = this.pulsarClient; + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build().get(); + @Cleanup + Producer<byte[]> producer = pulsarClient + .newProducer() + .topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + PersistentTopic persistentTopic = getTopic(topic); + long lastMaxReadPositionMovedForwardTimestamp = persistentTopic.getLastMaxReadPositionMovedForwardTimestamp(); + + // transactional publish will not update lastMaxReadPositionMovedForwardTimestamp + producer.newMessage(txn).value("hello world".getBytes()).send(); + assertTrue(persistentTopic.getLastMaxReadPositionMovedForwardTimestamp() == lastMaxReadPositionMovedForwardTimestamp); + + // commit transaction will update lastMaxReadPositionMovedForwardTimestamp + txn.commit().get(); + assertTrue(persistentTopic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp); + } + + private PersistentTopic getTopic(String topic) throws ExecutionException, InterruptedException { + Optional<Topic> optionalTopic = getPulsarServiceList().get(0).getBrokerService() + .getTopic(topic, true).get(); + return (PersistentTopic) optionalTopic.get(); + } + private void checkMessageId(List<CompletableFuture<MessageId>> futureList, boolean isFinished) { futureList.forEach(messageIdFuture -> { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 86def029186..fa32a859d13 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1086,7 +1086,7 @@ public class TransactionTest extends TransactionTestBase { }); Assert.assertEquals(changeMaxReadPositionCount.get(), 0L); - buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1)); + buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1), false); Assert.assertEquals(changeMaxReadPositionCount.get(), 0L); }
