This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 393f95cd419bd579f24fc2b5f9cd0cb8f085092c Author: congbo <[email protected]> AuthorDate: Sat Nov 13 09:15:30 2021 +0800 [Transaction] Fix transaction system topic create in loop. (#12749) fix https://github.com/apache/pulsar/issues/12727 ### Motivation Now transaction system topic can be created. ### Modifications we should not allow broker or user create by transaction system format topic. 1. checkout topic auto create. 2. admin create topic. ### Verifying this change add some test for it (cherry picked from commit 2c4d913c4b3fb1c6d924efaa0a24c93a2d2de7d0) --- .../pulsar/broker/transaction/TransactionTest.java | 57 ++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 34ea362..a851c14 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -180,6 +180,63 @@ public class TransactionTest extends TransactionTestBase { } @Test + public void testCreateTransactionSystemTopic() throws Exception { + String subName = "test"; + String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString(); + + try { + // init pending ack + @Cleanup + Consumer<byte[]> consumer = getConsumer(topicName, subName); + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + + consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException); + } + topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName); + + // getList does not include transaction system topic + List<String> list = admin.topics().getList(NAMESPACE1); + assertEquals(list.size(), 4); + list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX))); + + try { + // can't create transaction system topic + @Cleanup + Consumer<byte[]> consumer = getConsumer(topicName, subName); + fail(); + } catch (PulsarClientException.NotAllowedException e) { + assertTrue(e.getMessage().contains("Can not create transaction system topic")); + } + + // can't create transaction system topic + try { + admin.topics().getSubscriptions(topicName); + fail(); + } catch (PulsarAdminException.ConflictException e) { + assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName); + } + + // can't create transaction system topic + try { + admin.topics().createPartitionedTopic(topicName, 3); + fail(); + } catch (PulsarAdminException.ConflictException e) { + assertEquals(e.getMessage(), "Cannot create topic in system topic format!"); + } + + // can't create transaction system topic + try { + admin.topics().createNonPartitionedTopic(topicName); + fail(); + } catch (PulsarAdminException.ConflictException e) { + assertEquals(e.getMessage(), "Cannot create topic in system topic format!"); + } + } + + @Test public void brokerNotInitTxnManagedLedgerTopic() throws Exception { String subName = "test";
