This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eba93cf7254e22fda22d1ef0574b7b915d027db7 Author: congbo <[email protected]> AuthorDate: Fri Jul 1 20:05:20 2022 +0800 [fix][txn] Fix TopicTransactionBuffer ledger apend marker throw ManagedLedgerAlreadyClosedException (#16265) * [fix][txn] Fix TopicTransactionBuffer ledger apend marker throw ManagedLedgerAlreadyClosedException (cherry picked from commit e616e2cb388d9ff7282653c4976a32e89d5cccae) --- .../buffer/impl/TopicTransactionBuffer.java | 8 ++ .../buffer/TopicTransactionBufferTest.java | 89 ++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 67b7a24d0df..791cd39ee0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -321,6 +321,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen @Override public void addFailed(ManagedLedgerException exception, Object ctx) { log.error("Failed to commit for txn {}", txnID, exception); + checkAppendMarkerException(exception); completableFuture.completeExceptionally(new PersistenceException(exception)); } }, null); @@ -367,6 +368,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen @Override public void addFailed(ManagedLedgerException exception, Object ctx) { log.error("Failed to abort for txn {}", txnID, exception); + checkAppendMarkerException(exception); completableFuture.completeExceptionally(new PersistenceException(exception)); } }, null); @@ -381,6 +383,12 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen return completableFuture; } + private void checkAppendMarkerException(ManagedLedgerException exception) { + if (exception instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) { + topic.getManagedLedger().readyToCreateNewLedger(); + } + } + private void handleLowWaterMark(TxnID txnID, long lowWaterMark) { lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> { if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java new file mode 100644 index 00000000000..576ef647248 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer; + +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; +import org.awaitility.Awaitility; +import org.powermock.reflect.Whitebox; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class TopicTransactionBufferTest extends TransactionTestBase { + + + @BeforeMethod(alwaysRun = true) + protected void setup() throws Exception { + setBrokerCount(1); + setUpBase(1, 16, "persistent://" + NAMESPACE1 + "/test", 0); + + Map<TransactionCoordinatorID, TransactionMetadataStore> stores = + getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores(); + Awaitility.await().until(() -> { + if (stores.size() == 16) { + for (TransactionCoordinatorID transactionCoordinatorID : stores.keySet()) { + if (((MLTransactionMetadataStore) stores.get(transactionCoordinatorID)).getState() + != TransactionMetadataStoreState.State.Ready) { + return false; + } + } + return true; + } else { + return false; + } + }); + } + + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testTransactionBufferAppendMarkerWriteFailState() throws Exception { + final String topic = "persistent://" + NAMESPACE1 + "/testPendingAckManageLedgerWriteFailState"; + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build().get(); + + Producer<byte[]> producer = pulsarClient + .newProducer() + .topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .enableBatching(false) + .create(); + + producer.newMessage(txn).value("test".getBytes()).send(); + PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0) + .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get(); + Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed); + txn.commit().get(); + } +}
