This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 914f172592c6d205cc380ea3edb76e0f03904e0b Author: Xiangying Meng <[email protected]> AuthorDate: Fri Dec 17 14:47:12 2021 +0800 [Transaction] Delete the redundant code (#13327) The problem was resolved, so there is no need to add a wait and retry method again. 1. Delete the redundant code 2. Optimize some code form (cherry picked from commit fbe010323076ba2339e2339e3031a78e20b09061) --- .../broker/transaction/TransactionTestBase.java | 14 ----- .../pulsar/testclient/PerformanceConsumer.java | 10 ++-- .../pulsar/testclient/PerformanceProducer.java | 12 +++-- .../pulsar/testclient/PerformanceTransaction.java | 20 +++++--- .../pulsar/testclient/utils/PerformanceUtils.java | 59 ---------------------- .../testclient/PerformanceTransactionTest.java | 23 ++++----- 6 files changed, 38 insertions(+), 100 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index e13365d..fe7a813 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -61,8 +61,6 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.MockZooKeeperSession; import org.apache.zookeeper.ZooKeeper; -import org.awaitility.Awaitility; -import org.testng.Assert; @Slf4j public abstract class TransactionTestBase extends TestRetrySupport { @@ -144,8 +142,6 @@ public abstract class TransactionTestBase extends TestRetrySupport { .statsInterval(0, TimeUnit.SECONDS) .enableTransaction(true) .build(); - // wait tc init success to ready state - waitForCoordinatorToBeAvailable(numPartitionsOfTC); } protected void startBroker() throws Exception { @@ -332,14 +328,4 @@ public abstract class TransactionTestBase extends TestRetrySupport { log.warn("Failed to clean up mocked pulsar service:", e); } } - public void waitForCoordinatorToBeAvailable(int numOfTCPerBroker){ - // wait tc init success to ready state - Awaitility.await() - .untilAsserted(() -> { - int transactionMetaStoreCount = pulsarServiceList.stream() - .mapToInt(pulsarService -> pulsarService.getTransactionMetadataStoreService().getStores().size()) - .sum(); - Assert.assertEquals(transactionMetaStoreCount, numOfTCPerBroker); - }); - } } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 9c15a0e..3c7bed9 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -20,7 +20,6 @@ package org.apache.pulsar.testclient; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.testclient.utils.PerformanceUtils.buildTransaction; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -362,8 +361,13 @@ public class PerformanceConsumer { } PulsarClient pulsarClient = clientBuilder.build(); - AtomicReference<Transaction> atomicReference = buildTransaction(pulsarClient, arguments.isEnableTransaction, - arguments.transactionTimeout); + AtomicReference<Transaction> atomicReference; + if (arguments.isEnableTransaction) { + atomicReference = new AtomicReference<>(pulsarClient.newTransaction() + .withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS).build().get()); + } else { + atomicReference = new AtomicReference<>(null); + } AtomicLong messageAckedCount = new AtomicLong(); Semaphore messageReceiveLimiter = new Semaphore(arguments.numMessagesPerTransaction); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 20eb8f3..0e3d550 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -74,7 +74,6 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES; import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; import static org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES; -import static org.apache.pulsar.testclient.utils.PerformanceUtils.buildTransaction; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -422,7 +421,7 @@ public class PerformanceProducer { clientBuilder.allowTlsInsecureConnection(arguments.tlsAllowInsecureConnection); } - try (PulsarAdmin client = clientBuilder.build();) { + try (PulsarAdmin client = clientBuilder.build()) { for (String topic : arguments.topics) { log.info("Creating partitioned topic {} with {} partitions", topic, arguments.partitions); try { @@ -592,8 +591,15 @@ public class PerformanceProducer { // enable round robin message routing if it is a partitioned topic .messageRoutingMode(MessageRoutingMode.RoundRobinPartition); + AtomicReference<Transaction> transactionAtomicReference; if (arguments.isEnableTransaction) { producerBuilder.sendTimeout(0, TimeUnit.SECONDS); + transactionAtomicReference = new AtomicReference<>(client.newTransaction() + .withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS) + .build() + .get()); + } else { + transactionAtomicReference = new AtomicReference<>(null); } if (arguments.producerName != null) { String producerName = String.format("%s%s%d", arguments.producerName, arguments.separator, producerId); @@ -659,8 +665,6 @@ public class PerformanceProducer { } // Send messages on all topics/producers long totalSent = 0; - AtomicReference<Transaction> transactionAtomicReference = buildTransaction(client, - arguments.isEnableTransaction, arguments.transactionTimeout); AtomicLong numMessageSend = new AtomicLong(0); Semaphore numMsgPerTxnLimit = new Semaphore(arguments.numMessagesPerTransaction); while (true) { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java index 5127f85..eee284b 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java @@ -19,7 +19,6 @@ package org.apache.pulsar.testclient; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.pulsar.testclient.utils.PerformanceUtils.buildTransaction; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -284,7 +283,7 @@ public class PerformanceTransaction { ExecutorService executorService = new ThreadPoolExecutor(arguments.numTestThreads, arguments.numTestThreads, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>()); + new LinkedBlockingQueue<>()); long startTime = System.nanoTime(); @@ -311,16 +310,23 @@ public class PerformanceTransaction { //A thread may perform tasks of multiple transactions in a traversing manner. List<Producer<byte[]>> producers = null; List<List<Consumer<byte[]>>> consumers = null; + AtomicReference<Transaction> atomicReference = null; try { producers = buildProducers(client, arguments); consumers = buildConsumer(client, arguments); + if (!arguments.isDisableTransaction) { + atomicReference = new AtomicReference<>(client.newTransaction() + .withTransactionTimeout(arguments.transactionTimeout, TimeUnit.SECONDS) + .build() + .get()); + } else { + atomicReference = new AtomicReference<>(null); + } } catch (Exception e) { log.error("Failed to build Producer/Consumer with exception : ", e); executorService.shutdownNow(); PerfClientUtils.exit(-1); } - AtomicReference<Transaction> atomicReference = buildTransaction(client, - !arguments.isDisableTransaction, arguments.transactionTimeout); //The while loop has no break, and finally ends the execution through the shutdownNow of //the executorService while (true) { @@ -351,7 +357,7 @@ public class PerformanceTransaction { for (List<Consumer<byte[]>> subscriptions : consumers) { for (Consumer<byte[]> consumer : subscriptions) { for (int j = 0; j < arguments.numMessagesReceivedPerTransaction; j++) { - Message message = null; + Message<byte[]> message = null; try { message = consumer.receive(); } catch (PulsarClientException e) { @@ -690,9 +696,7 @@ public class PerformanceTransaction { .sendTimeout(0, TimeUnit.SECONDS); final List<Future<Producer<byte[]>>> producerFutures = Lists.newArrayList(); - Iterator<String> produceTopicsIterator = arguments.producerTopic.iterator(); - while(produceTopicsIterator.hasNext()){ - String topic = produceTopicsIterator.next(); + for (String topic : arguments.producerTopic) { log.info("Create producer for topic {}", topic); producerFutures.add(producerBuilder.clone().topic(topic).createAsync()); } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/utils/PerformanceUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/utils/PerformanceUtils.java deleted file mode 100644 index ded1131..0000000 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/utils/PerformanceUtils.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.testclient.utils; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.transaction.Transaction; -import org.apache.pulsar.testclient.PerformanceProducer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PerformanceUtils { - - private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class); - - public static AtomicReference<Transaction> buildTransaction(PulsarClient pulsarClient, boolean isEnableTransaction, - long transactionTimeout) { - - AtomicLong numBuildTxnFailed = new AtomicLong(); - if (isEnableTransaction) { - while(true) { - AtomicReference atomicReference = null; - try { - atomicReference = new AtomicReference(pulsarClient.newTransaction() - .withTransactionTimeout(transactionTimeout, TimeUnit.SECONDS).build().get()); - } catch (Exception e) { - numBuildTxnFailed.incrementAndGet(); - if (numBuildTxnFailed.get()%10 == 0) { - log.error("Failed to new a transaction with {} times", numBuildTxnFailed.get(), e); - } - } - if (atomicReference != null && atomicReference.get() != null) { - log.info("After {} failures, the transaction was created successfully for the first time", - numBuildTxnFailed.get()); - return atomicReference; - } - } - } - return new AtomicReference<>(null); - } -} diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java index c5e62f7..a08fbe0 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java @@ -91,8 +91,8 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest { @Test public void testTxnPerf() throws Exception { String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u %s -ss %s -np 1 -au %s"; - String testConsumeTopic = testTopic + UUID.randomUUID().toString(); - String testProduceTopic = testTopic + UUID.randomUUID().toString(); + String testConsumeTopic = testTopic + UUID.randomUUID(); + String testProduceTopic = testTopic + UUID.randomUUID(); String testSub = "testSub"; admin.topics().createPartitionedTopic(testConsumeTopic, 1); String args = String.format(argString, testConsumeTopic, testProduceTopic, @@ -119,9 +119,8 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest { CountDownLatch countDownLatch = new CountDownLatch(50); for (int i = 0; i < 50 ; i++) { - produceToConsumeTopic.newMessage().value(("testConsume " + i).getBytes()).sendAsync().thenRun(() -> { - countDownLatch.countDown(); - }); + produceToConsumeTopic.newMessage().value(("testConsume " + i).getBytes()).sendAsync().thenRun( + countDownLatch::countDown); } countDownLatch.await(); @@ -149,11 +148,11 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest { .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); for (int i = 0; i < 50; i++) { - Message message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS); + Message<byte[]> message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS); Assert.assertNotNull(message); consumeFromProduceTopic.acknowledge(message); } - Message message = consumeFromConsumeTopic.receive(2, TimeUnit.SECONDS); + Message<byte[]> message = consumeFromConsumeTopic.receive(2, TimeUnit.SECONDS); Assert.assertNull(message); message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS); Assert.assertNull(message); @@ -187,16 +186,16 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest { .enableBatchIndexAcknowledgment(false) .subscribe(); for (int i = 0; i < totalMessage; i++) { - Message message = consumer.receive(2, TimeUnit.SECONDS); + Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNotNull(message); consumer.acknowledge(message); } - Message message = consumer.receive(2, TimeUnit.SECONDS); + Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNull(message); } @Test - public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException, ExecutionException { + public void testConsumeTxnMessage() throws InterruptedException, PulsarClientException { String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d"; String subName = "sub"; String topic = testTopic + UUID.randomUUID(); @@ -230,10 +229,10 @@ public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest { .enableBatchIndexAcknowledgment(false) .subscribe(); for (int i = 0; i < 5; i++) { - Message message = consumer.receive(2, TimeUnit.SECONDS); + Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNotNull(message); } - Message message = consumer.receive(2, TimeUnit.SECONDS); + Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS); Assert.assertNull(message); }
