congbobo184 commented on a change in pull request #11933:
URL: https://github.com/apache/pulsar/pull/11933#discussion_r716126385
##########
File path:
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -340,9 +344,123 @@ public static void main(String[] args) throws Exception {
if (isNotBlank(arguments.listenerName)) {
clientBuilder.listenerName(arguments.listenerName);
}
-
PulsarClient pulsarClient = clientBuilder.build();
+ AtomicReference<Transaction> atomicReference =
buildTransaction(pulsarClient, arguments.isEnableTransaction,
+ arguments.transactionTimeout);
+
+ AtomicLong messageAckedCount = new AtomicLong();
+ Semaphore messageReceiveLimiter = new
Semaphore(arguments.numMessagesPerTransaction);
+ Thread thread = Thread.currentThread();
+ MessageListener<ByteBuffer> listener = (consumer, msg) -> {
+ if(arguments.isEnableTransaction){
+ try {
+ messageReceiveLimiter.acquire();
+ }catch (InterruptedException e){
+ log.error("Got error: ", e);
+ }
+ }
+ if (arguments.testTime > 0) {
+ if (System.nanoTime() > testEndTime) {
+ log.info("------------------- DONE
-----------------------");
+ printAggregatedStats();
+ PerfClientUtils.exit(0);
+ thread.interrupt();
+ }
+ }
+ if(arguments.totalNumTxn > 0) {
+ if (totalEndTxnOpFailNum.sum() +
totalEndTxnOpSuccessNum.sum() >= arguments.totalNumTxn) {
+ log.info("------------------- DONE
-----------------------");
+ printAggregatedStats();
+ PerfClientUtils.exit(0);
+ thread.interrupt();
+ }
+ }
+ messagesReceived.increment();
+ bytesReceived.add(msg.size());
+
+ totalMessagesReceived.increment();
+ totalBytesReceived.add(msg.size());
+
+ if (limiter != null) {
+ limiter.acquire();
+ }
+
+ long latencyMillis = System.currentTimeMillis() -
msg.getPublishTime();
+ if (latencyMillis >= 0) {
+ recorder.recordValue(latencyMillis);
+ cumulativeRecorder.recordValue(latencyMillis);
+ }
+ if (arguments.isEnableTransaction) {
+ consumer.acknowledgeAsync(msg.getMessageId(),
atomicReference.get()).thenRun(() -> {
+ totalMessageAck.increment();
+ messageAck.increment();
+ }).exceptionally(throwable ->{
+ log.error("Ack message {} failed with exception", msg,
throwable);
+ totalMessageAckFailed.increment();
+ return null;
+ });
+ } else {
+ consumer.acknowledgeAsync(msg).thenRun(()->{
+ totalMessageAck.increment();
+ messageAck.increment();
+ }
+ ).exceptionally(throwable ->{
+ log.error("Ack message {} failed with
exception", msg, throwable);
+ totalMessageAckFailed.increment();
+ return null;
+ }
+ );
+ }
+ if(arguments.poolMessages) {
+ msg.release();
+ }
+ if (arguments.isEnableTransaction
Review comment:
why use repeat arguments.isEnableTransaction
##########
File path:
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
##########
@@ -619,7 +669,15 @@ private static void runProducer(int producerId,
}
}
rateLimiter.acquire();
-
+ if(arguments.isEnableTransaction &&
arguments.numMessagesPerTransaction > 0){
Review comment:
the same as performanceConsumer
##########
File path:
pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
##########
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.testng.Assert.fail;
+
+@Slf4j
+public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
+ private final String testTenant = "pulsar";
+ private final String testNamespace = "perf";
+ private final String myNamespace = testTenant + "/" + testNamespace;
+ private final String testTopic = "persistent://" + myNamespace + "/test-";
+ private final AtomicInteger lastExitCode = new AtomicInteger(0);
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ PerfClientUtils.setExitProcedure(code -> {
+ log.error("JVM exit code is {}", code);
+ if (code != 0) {
+ throw new RuntimeException("JVM should exit with code " +
code);
+ }
+ });
+ // Setup namespaces
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ int exitCode = lastExitCode.get();
+ if (exitCode != 0) {
+ fail("Unexpected JVM exit code "+exitCode);
+ }
+ }
+
+ @Test
+ public void testTxnPerf() throws Exception {
+ String argString = "--topics-c %s --topics-p %s -threads 5 -ntxn 500
-u %s -ss %s -np 1 -au %s";
+ String testConsumeTopic = testTopic + UUID.randomUUID().toString();
+ String testProduceTopic = testTopic + UUID.randomUUID().toString();
+ String testSub = "testSub";
+ admin.topics().createPartitionedTopic(testConsumeTopic, 1);
+ String args = String.format(argString, testConsumeTopic,
testProduceTopic,
+ pulsar.getBrokerServiceUrl(), testSub, new
URL(pulsar.getWebServiceAddress()));
+
+
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .enableTransaction(true)
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .connectionsPerBroker(100)
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Producer<byte[]> produceToConsumeTopic =
pulsarClient.newProducer(Schema.BYTES)
+ .producerName("perf-transaction-producer")
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(testConsumeTopic)
+ .create();
+
+ CountDownLatch countDownLatch = new CountDownLatch(500);
+ for (int i = 0; i < 510
+ ; i++) {
+ produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).sendAsync().thenRun(() -> {
+ countDownLatch.countDown();
+ });
+ }
+
+ countDownLatch.await();
+
+ Thread thread = new Thread(() -> {
+ try {
+ PerformanceTransaction.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumeFromConsumeTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-consumeVerify")
+ .topic(testConsumeTopic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ Consumer<byte[]> consumeFromProduceTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-produceVerify")
+ .topic(testProduceTopic)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ for (int i = 0; i < 5; i++) {
+ Message message = consumeFromProduceTopic.receive(2,
TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ }
+ boolean noMessage = false;
+ for (int i = 0; i < 10; i++) {
+ Message message = consumeFromConsumeTopic.receive(2,
TimeUnit.SECONDS);
Review comment:
should receive 10 message and can't receive 11 messages
##########
File path:
pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
##########
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.testng.Assert.fail;
+
+@Slf4j
+public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
+ private final String testTenant = "pulsar";
+ private final String testNamespace = "perf";
+ private final String myNamespace = testTenant + "/" + testNamespace;
+ private final String testTopic = "persistent://" + myNamespace + "/test-";
+ private final AtomicInteger lastExitCode = new AtomicInteger(0);
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ PerfClientUtils.setExitProcedure(code -> {
+ log.error("JVM exit code is {}", code);
+ if (code != 0) {
+ throw new RuntimeException("JVM should exit with code " +
code);
+ }
+ });
+ // Setup namespaces
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ int exitCode = lastExitCode.get();
+ if (exitCode != 0) {
+ fail("Unexpected JVM exit code "+exitCode);
+ }
+ }
+
+ @Test
+ public void testTxnPerf() throws Exception {
+ String argString = "--topics-c %s --topics-p %s -threads 5 -ntxn 500
-u %s -ss %s -np 1 -au %s";
+ String testConsumeTopic = testTopic + UUID.randomUUID().toString();
+ String testProduceTopic = testTopic + UUID.randomUUID().toString();
+ String testSub = "testSub";
+ admin.topics().createPartitionedTopic(testConsumeTopic, 1);
+ String args = String.format(argString, testConsumeTopic,
testProduceTopic,
+ pulsar.getBrokerServiceUrl(), testSub, new
URL(pulsar.getWebServiceAddress()));
+
+
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .enableTransaction(true)
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .connectionsPerBroker(100)
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Producer<byte[]> produceToConsumeTopic =
pulsarClient.newProducer(Schema.BYTES)
+ .producerName("perf-transaction-producer")
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(testConsumeTopic)
+ .create();
+
+ CountDownLatch countDownLatch = new CountDownLatch(500);
+ for (int i = 0; i < 510
+ ; i++) {
+ produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).sendAsync().thenRun(() -> {
Review comment:
should add sub before send message, otherwise test will become unstable
##########
File path:
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -406,33 +537,78 @@ public static void main(String[] args) throws Exception {
long total = totalMessagesReceived.sum();
double rate = messagesReceived.sumThenReset() / elapsed;
double throughput = bytesReceived.sumThenReset() / elapsed * 8 /
1024 / 1024;
-
+ double rateAck = messageAck.sumThenReset() / elapsed;
+ long totalTxnOpSuccessNum = 0;
+ long totalTxnOpFailNum = 0;
+ double rateOpenTxn = 0;
+ if (arguments.isEnableTransaction) {
+ totalTxnOpSuccessNum = totalEndTxnOpSuccessNum.sum();
+ totalTxnOpFailNum = totalEndTxnOpFailNum.sum();
+ rateOpenTxn = numTxnOp.sumThenReset() / elapsed;
+ }
reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+ if(arguments.isEnableTransaction) {
+ log.info("--- Transaction: {} transaction end successfully ---
{} transaction end failed "
+ + "--- {} Txn/s --- AckRate: {} msg/s",
+ totalTxnOpSuccessNum,
+ totalTxnOpFailNum,
+ dec.format(rateOpenTxn),
+ dec.format(rateAck));
+ }
log.info(
- "Throughput received: {} msg --- {} msg/s -- {} Mbit/s
--- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} -
99.99pct: {} - Max: {}",
+ "Throughput received: {} msg --- {} msg/s -- {} Mbit/s "
+ + "--- Latency: mean: {} ms - med: {} "
+ + "- 95pct: {} - 99pct: {} - 99.9pct: {} -
99.99pct: {} - Max: {}",
intFormat.format(total),
dec.format(rate), dec.format(throughput),
dec.format(reportHistogram.getMean()),
reportHistogram.getValueAtPercentile(50),
reportHistogram.getValueAtPercentile(95),
reportHistogram.getValueAtPercentile(99),
reportHistogram.getValueAtPercentile(99.9),
reportHistogram.getValueAtPercentile(99.99),
reportHistogram.getMaxValue());
-
reportHistogram.reset();
oldTime = now;
}
pulsarClient.close();
}
- private static void printAggregatedThroughput(long start) {
+ private static void printAggregatedThroughput(long start, Arguments
arguments) {
double elapsed = (System.nanoTime() - start) / 1e9;
double rate = totalMessagesReceived.sum() / elapsed;
double throughput = totalBytesReceived.sum() / elapsed * 8 / 1024 /
1024;
+ long totalEndTxnSuccess = 0;
+ long totalEndTxnFail = 0;
+ long numTransactionOpenFailed = 0;
+ long numTransactionOpenSuccess = 0;
+ long totalnumMessageAckFailed = 0;
+ double rateAck = totalMessageAck.sum() / elapsed;
+ double rateOpenTxn = 0;
+ if (arguments.isEnableTransaction) {
+ totalEndTxnSuccess = totalEndTxnOpSuccessNum.sum();
+ totalEndTxnFail = totalEndTxnOpFailNum.sum();
+ rateOpenTxn = (totalEndTxnSuccess + totalEndTxnFail) / elapsed;
+ totalnumMessageAckFailed = totalMessageAckFailed.sum();
+ numTransactionOpenFailed = totalNumTxnOpenTxnFail.sum();
+ numTransactionOpenSuccess = totalNumTxnOpenTxnSuccess.sum();
+ }
+ if(arguments.isEnableTransaction){
Review comment:
two if block is repeat
##########
File path:
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
##########
@@ -680,20 +788,44 @@ private static void runProducer(int producerId,
if (null != client) {
try {
client.close();
+ PerfClientUtils.exit(-1);
} catch (PulsarClientException e) {
log.error("Failed to close test client", e);
}
}
}
}
- private static void printAggregatedThroughput(long start) {
+ private static void printAggregatedThroughput(long start, Arguments
arguments) {
double elapsed = (System.nanoTime() - start) / 1e9;
double rate = totalMessagesSent.sum() / elapsed;
double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8;
+ long totalTxnSuccess = 0;
+ long totalTxnFail = 0;
+ double rateOpenTxn = 0;
+ long numTransactionOpenFailed = 0;
+ long numTransactionOpenSuccess = 0;
+ if (arguments.isEnableTransaction) {
+ totalTxnSuccess = totalEndTxnOpSuccessNum.sum();
+ totalTxnFail = totalEndTxnOpFailNum.sum();
+ rateOpenTxn = elapsed / (totalTxnFail + totalTxnSuccess);
+ numTransactionOpenFailed = totalNumTxnOpenTxnFail.sum();
+ numTransactionOpenSuccess = totalNumTxnOpenTxnSuccess.sum();
+ }
+
+ if(arguments.isEnableTransaction){
Review comment:
arguments.isEnableTransaction
##########
File path:
pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
##########
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.testng.Assert.fail;
+
+@Slf4j
+public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
+ private final String testTenant = "pulsar";
+ private final String testNamespace = "perf";
+ private final String myNamespace = testTenant + "/" + testNamespace;
+ private final String testTopic = "persistent://" + myNamespace + "/test-";
+ private final AtomicInteger lastExitCode = new AtomicInteger(0);
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ PerfClientUtils.setExitProcedure(code -> {
+ log.error("JVM exit code is {}", code);
+ if (code != 0) {
+ throw new RuntimeException("JVM should exit with code " +
code);
+ }
+ });
+ // Setup namespaces
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ int exitCode = lastExitCode.get();
+ if (exitCode != 0) {
+ fail("Unexpected JVM exit code "+exitCode);
+ }
+ }
+
+ @Test
+ public void testTxnPerf() throws Exception {
+ String argString = "--topics-c %s --topics-p %s -threads 5 -ntxn 500
-u %s -ss %s -np 1 -au %s";
+ String testConsumeTopic = testTopic + UUID.randomUUID().toString();
+ String testProduceTopic = testTopic + UUID.randomUUID().toString();
+ String testSub = "testSub";
+ admin.topics().createPartitionedTopic(testConsumeTopic, 1);
+ String args = String.format(argString, testConsumeTopic,
testProduceTopic,
+ pulsar.getBrokerServiceUrl(), testSub, new
URL(pulsar.getWebServiceAddress()));
+
+
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .enableTransaction(true)
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .connectionsPerBroker(100)
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Producer<byte[]> produceToConsumeTopic =
pulsarClient.newProducer(Schema.BYTES)
+ .producerName("perf-transaction-producer")
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(testConsumeTopic)
+ .create();
+
+ CountDownLatch countDownLatch = new CountDownLatch(500);
+ for (int i = 0; i < 510
+ ; i++) {
+ produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).sendAsync().thenRun(() -> {
+ countDownLatch.countDown();
+ });
+ }
+
+ countDownLatch.await();
+
+ Thread thread = new Thread(() -> {
+ try {
+ PerformanceTransaction.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumeFromConsumeTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-consumeVerify")
+ .topic(testConsumeTopic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ Consumer<byte[]> consumeFromProduceTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-produceVerify")
+ .topic(testProduceTopic)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ for (int i = 0; i < 5; i++) {
+ Message message = consumeFromProduceTopic.receive(2,
TimeUnit.SECONDS);
Review comment:
should receive 500 messages, I suggest you can send 50 messages to test.
##########
File path:
pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
##########
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.testng.Assert.fail;
+
+@Slf4j
+public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
+ private final String testTenant = "pulsar";
+ private final String testNamespace = "perf";
+ private final String myNamespace = testTenant + "/" + testNamespace;
+ private final String testTopic = "persistent://" + myNamespace + "/test-";
+ private final AtomicInteger lastExitCode = new AtomicInteger(0);
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ PerfClientUtils.setExitProcedure(code -> {
+ log.error("JVM exit code is {}", code);
+ if (code != 0) {
+ throw new RuntimeException("JVM should exit with code " +
code);
+ }
+ });
+ // Setup namespaces
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ int exitCode = lastExitCode.get();
+ if (exitCode != 0) {
+ fail("Unexpected JVM exit code "+exitCode);
+ }
+ }
+
+ @Test
+ public void testTxnPerf() throws Exception {
+ String argString = "--topics-c %s --topics-p %s -threads 5 -ntxn 500
-u %s -ss %s -np 1 -au %s";
+ String testConsumeTopic = testTopic + UUID.randomUUID().toString();
+ String testProduceTopic = testTopic + UUID.randomUUID().toString();
+ String testSub = "testSub";
+ admin.topics().createPartitionedTopic(testConsumeTopic, 1);
+ String args = String.format(argString, testConsumeTopic,
testProduceTopic,
+ pulsar.getBrokerServiceUrl(), testSub, new
URL(pulsar.getWebServiceAddress()));
+
+
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .enableTransaction(true)
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .connectionsPerBroker(100)
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Producer<byte[]> produceToConsumeTopic =
pulsarClient.newProducer(Schema.BYTES)
+ .producerName("perf-transaction-producer")
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(testConsumeTopic)
+ .create();
+
+ CountDownLatch countDownLatch = new CountDownLatch(500);
+ for (int i = 0; i < 510
+ ; i++) {
+ produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).sendAsync().thenRun(() -> {
+ countDownLatch.countDown();
+ });
+ }
+
+ countDownLatch.await();
+
+ Thread thread = new Thread(() -> {
+ try {
+ PerformanceTransaction.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumeFromConsumeTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-consumeVerify")
+ .topic(testConsumeTopic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ Consumer<byte[]> consumeFromProduceTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-produceVerify")
+ .topic(testProduceTopic)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ for (int i = 0; i < 5; i++) {
+ Message message = consumeFromProduceTopic.receive(2,
TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ }
+ boolean noMessage = false;
+ for (int i = 0; i < 10; i++) {
+ Message message = consumeFromConsumeTopic.receive(2,
TimeUnit.SECONDS);
+ if(message == null){
+ noMessage = true;
+ }
+ }
+ Assert.assertTrue(noMessage);
+ }
+
+
+ @Test
+ public void testProduceTxnMessage() throws InterruptedException,
PulsarClientException {
+ String argString = "%s -r 10 -u %s -m %d -txn";
+ String topic = testTopic + UUID.randomUUID();
+ int totalMessage = 500;
+ String args = String.format(argString, topic,
pulsar.getBrokerServiceUrl(), totalMessage);
+
+ Thread thread = new Thread(() -> {
+ try {
+ log.info("");
+ PerformanceProducer.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName("subName").topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .enableBatchIndexAcknowledgment(false)
+ .subscribe();
+ for (int i = 0; i < totalMessage; i++) {
+ Message message = consumer.receive();
Review comment:
ack this message. If ack 500 messages, we can't receive message anymore.
I suggest the test message change to 100.
##########
File path:
pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
##########
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.testng.Assert.fail;
+
+@Slf4j
+public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
+ private final String testTenant = "pulsar";
+ private final String testNamespace = "perf";
+ private final String myNamespace = testTenant + "/" + testNamespace;
+ private final String testTopic = "persistent://" + myNamespace + "/test-";
+ private final AtomicInteger lastExitCode = new AtomicInteger(0);
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ PerfClientUtils.setExitProcedure(code -> {
+ log.error("JVM exit code is {}", code);
+ if (code != 0) {
+ throw new RuntimeException("JVM should exit with code " +
code);
+ }
+ });
+ // Setup namespaces
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ int exitCode = lastExitCode.get();
+ if (exitCode != 0) {
+ fail("Unexpected JVM exit code "+exitCode);
+ }
+ }
+
+ @Test
+ public void testTxnPerf() throws Exception {
+ String argString = "--topics-c %s --topics-p %s -threads 5 -ntxn 500
-u %s -ss %s -np 1 -au %s";
+ String testConsumeTopic = testTopic + UUID.randomUUID().toString();
+ String testProduceTopic = testTopic + UUID.randomUUID().toString();
+ String testSub = "testSub";
+ admin.topics().createPartitionedTopic(testConsumeTopic, 1);
+ String args = String.format(argString, testConsumeTopic,
testProduceTopic,
+ pulsar.getBrokerServiceUrl(), testSub, new
URL(pulsar.getWebServiceAddress()));
+
+
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .enableTransaction(true)
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .connectionsPerBroker(100)
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Producer<byte[]> produceToConsumeTopic =
pulsarClient.newProducer(Schema.BYTES)
+ .producerName("perf-transaction-producer")
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(testConsumeTopic)
+ .create();
+
+ CountDownLatch countDownLatch = new CountDownLatch(500);
+ for (int i = 0; i < 510
+ ; i++) {
+ produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).sendAsync().thenRun(() -> {
+ countDownLatch.countDown();
+ });
+ }
+
+ countDownLatch.await();
+
+ Thread thread = new Thread(() -> {
+ try {
+ PerformanceTransaction.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumeFromConsumeTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-consumeVerify")
+ .topic(testConsumeTopic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ Consumer<byte[]> consumeFromProduceTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-produceVerify")
+ .topic(testProduceTopic)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ for (int i = 0; i < 5; i++) {
+ Message message = consumeFromProduceTopic.receive(2,
TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ }
+ boolean noMessage = false;
+ for (int i = 0; i < 10; i++) {
+ Message message = consumeFromConsumeTopic.receive(2,
TimeUnit.SECONDS);
+ if(message == null){
+ noMessage = true;
+ }
+ }
+ Assert.assertTrue(noMessage);
+ }
+
+
+ @Test
+ public void testProduceTxnMessage() throws InterruptedException,
PulsarClientException {
+ String argString = "%s -r 10 -u %s -m %d -txn";
+ String topic = testTopic + UUID.randomUUID();
+ int totalMessage = 500;
+ String args = String.format(argString, topic,
pulsar.getBrokerServiceUrl(), totalMessage);
+
+ Thread thread = new Thread(() -> {
+ try {
+ log.info("");
+ PerformanceProducer.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName("subName").topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .enableBatchIndexAcknowledgment(false)
+ .subscribe();
+ for (int i = 0; i < totalMessage; i++) {
+ Message message = consumer.receive();
+ Assert.assertNotNull(message);
+ }
+
+
+ }
+
+ @Test
+ public void testConsumeTxnMessage() throws InterruptedException,
PulsarClientException, ExecutionException {
+ String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d";
+ String subName = "sub";
+ String topic = testTopic + UUID.randomUUID();
+ String args = String.format(argString, topic,
pulsar.getBrokerServiceUrl(), subName,
+ SubscriptionType.Exclusive,
SubscriptionInitialPosition.Earliest, 10);
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ for (int i = 0; i < 505; i++) {
Review comment:
should add sub before send message, otherwise test will become unstable
##########
File path:
pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
##########
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.testng.Assert.fail;
+
+@Slf4j
+public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
+ private final String testTenant = "pulsar";
+ private final String testNamespace = "perf";
+ private final String myNamespace = testTenant + "/" + testNamespace;
+ private final String testTopic = "persistent://" + myNamespace + "/test-";
+ private final AtomicInteger lastExitCode = new AtomicInteger(0);
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ PerfClientUtils.setExitProcedure(code -> {
+ log.error("JVM exit code is {}", code);
+ if (code != 0) {
+ throw new RuntimeException("JVM should exit with code " +
code);
+ }
+ });
+ // Setup namespaces
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ int exitCode = lastExitCode.get();
+ if (exitCode != 0) {
+ fail("Unexpected JVM exit code "+exitCode);
+ }
+ }
+
+ @Test
+ public void testTxnPerf() throws Exception {
+ String argString = "--topics-c %s --topics-p %s -threads 5 -ntxn 500
-u %s -ss %s -np 1 -au %s";
+ String testConsumeTopic = testTopic + UUID.randomUUID().toString();
+ String testProduceTopic = testTopic + UUID.randomUUID().toString();
+ String testSub = "testSub";
+ admin.topics().createPartitionedTopic(testConsumeTopic, 1);
+ String args = String.format(argString, testConsumeTopic,
testProduceTopic,
+ pulsar.getBrokerServiceUrl(), testSub, new
URL(pulsar.getWebServiceAddress()));
+
+
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .enableTransaction(true)
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .connectionsPerBroker(100)
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Producer<byte[]> produceToConsumeTopic =
pulsarClient.newProducer(Schema.BYTES)
+ .producerName("perf-transaction-producer")
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(testConsumeTopic)
+ .create();
+
+ CountDownLatch countDownLatch = new CountDownLatch(500);
+ for (int i = 0; i < 510
+ ; i++) {
+ produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).sendAsync().thenRun(() -> {
+ countDownLatch.countDown();
+ });
+ }
+
+ countDownLatch.await();
+
+ Thread thread = new Thread(() -> {
+ try {
+ PerformanceTransaction.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumeFromConsumeTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-consumeVerify")
+ .topic(testConsumeTopic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ Consumer<byte[]> consumeFromProduceTopic =
pulsarClient.newConsumer(Schema.BYTES)
Review comment:
should add sub before send message, otherwise test will become unstable
##########
File path:
pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
##########
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.testng.Assert.fail;
+
+@Slf4j
+public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
+ private final String testTenant = "pulsar";
+ private final String testNamespace = "perf";
+ private final String myNamespace = testTenant + "/" + testNamespace;
+ private final String testTopic = "persistent://" + myNamespace + "/test-";
+ private final AtomicInteger lastExitCode = new AtomicInteger(0);
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ PerfClientUtils.setExitProcedure(code -> {
+ log.error("JVM exit code is {}", code);
+ if (code != 0) {
+ throw new RuntimeException("JVM should exit with code " +
code);
+ }
+ });
+ // Setup namespaces
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ int exitCode = lastExitCode.get();
+ if (exitCode != 0) {
+ fail("Unexpected JVM exit code "+exitCode);
+ }
+ }
+
+ @Test
+ public void testTxnPerf() throws Exception {
+ String argString = "--topics-c %s --topics-p %s -threads 5 -ntxn 500
-u %s -ss %s -np 1 -au %s";
+ String testConsumeTopic = testTopic + UUID.randomUUID().toString();
+ String testProduceTopic = testTopic + UUID.randomUUID().toString();
+ String testSub = "testSub";
+ admin.topics().createPartitionedTopic(testConsumeTopic, 1);
+ String args = String.format(argString, testConsumeTopic,
testProduceTopic,
+ pulsar.getBrokerServiceUrl(), testSub, new
URL(pulsar.getWebServiceAddress()));
+
+
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .enableTransaction(true)
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .connectionsPerBroker(100)
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Producer<byte[]> produceToConsumeTopic =
pulsarClient.newProducer(Schema.BYTES)
+ .producerName("perf-transaction-producer")
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(testConsumeTopic)
+ .create();
+
+ CountDownLatch countDownLatch = new CountDownLatch(500);
+ for (int i = 0; i < 510
+ ; i++) {
+ produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).sendAsync().thenRun(() -> {
+ countDownLatch.countDown();
+ });
+ }
+
+ countDownLatch.await();
+
+ Thread thread = new Thread(() -> {
+ try {
+ PerformanceTransaction.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumeFromConsumeTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-consumeVerify")
+ .topic(testConsumeTopic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ Consumer<byte[]> consumeFromProduceTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-produceVerify")
+ .topic(testProduceTopic)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ for (int i = 0; i < 5; i++) {
+ Message message = consumeFromProduceTopic.receive(2,
TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ }
+ boolean noMessage = false;
+ for (int i = 0; i < 10; i++) {
+ Message message = consumeFromConsumeTopic.receive(2,
TimeUnit.SECONDS);
+ if(message == null){
+ noMessage = true;
+ }
+ }
+ Assert.assertTrue(noMessage);
+ }
+
+
+ @Test
+ public void testProduceTxnMessage() throws InterruptedException,
PulsarClientException {
+ String argString = "%s -r 10 -u %s -m %d -txn";
+ String topic = testTopic + UUID.randomUUID();
+ int totalMessage = 500;
+ String args = String.format(argString, topic,
pulsar.getBrokerServiceUrl(), totalMessage);
+
+ Thread thread = new Thread(() -> {
+ try {
+ log.info("");
+ PerformanceProducer.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName("subName").topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .enableBatchIndexAcknowledgment(false)
+ .subscribe();
+ for (int i = 0; i < totalMessage; i++) {
Review comment:
should add sub before send message, otherwise test will become unstable
##########
File path:
pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
##########
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.google.common.collect.Sets;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.testng.Assert.fail;
+
+@Slf4j
+public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
+ private final String testTenant = "pulsar";
+ private final String testNamespace = "perf";
+ private final String myNamespace = testTenant + "/" + testNamespace;
+ private final String testTopic = "persistent://" + myNamespace + "/test-";
+ private final AtomicInteger lastExitCode = new AtomicInteger(0);
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ PerfClientUtils.setExitProcedure(code -> {
+ log.error("JVM exit code is {}", code);
+ if (code != 0) {
+ throw new RuntimeException("JVM should exit with code " +
code);
+ }
+ });
+ // Setup namespaces
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ int exitCode = lastExitCode.get();
+ if (exitCode != 0) {
+ fail("Unexpected JVM exit code "+exitCode);
+ }
+ }
+
+ @Test
+ public void testTxnPerf() throws Exception {
+ String argString = "--topics-c %s --topics-p %s -threads 5 -ntxn 500
-u %s -ss %s -np 1 -au %s";
+ String testConsumeTopic = testTopic + UUID.randomUUID().toString();
+ String testProduceTopic = testTopic + UUID.randomUUID().toString();
+ String testSub = "testSub";
+ admin.topics().createPartitionedTopic(testConsumeTopic, 1);
+ String args = String.format(argString, testConsumeTopic,
testProduceTopic,
+ pulsar.getBrokerServiceUrl(), testSub, new
URL(pulsar.getWebServiceAddress()));
+
+
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .enableTransaction(true)
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .connectionsPerBroker(100)
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Producer<byte[]> produceToConsumeTopic =
pulsarClient.newProducer(Schema.BYTES)
+ .producerName("perf-transaction-producer")
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(testConsumeTopic)
+ .create();
+
+ CountDownLatch countDownLatch = new CountDownLatch(500);
+ for (int i = 0; i < 510
+ ; i++) {
+ produceToConsumeTopic.newMessage().value(("testConsume " +
i).getBytes()).sendAsync().thenRun(() -> {
+ countDownLatch.countDown();
+ });
+ }
+
+ countDownLatch.await();
+
+ Thread thread = new Thread(() -> {
+ try {
+ PerformanceTransaction.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumeFromConsumeTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-consumeVerify")
+ .topic(testConsumeTopic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ Consumer<byte[]> consumeFromProduceTopic =
pulsarClient.newConsumer(Schema.BYTES)
+ .consumerName("perf-transaction-produceVerify")
+ .topic(testProduceTopic)
+ .subscriptionName(testSub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ for (int i = 0; i < 5; i++) {
+ Message message = consumeFromProduceTopic.receive(2,
TimeUnit.SECONDS);
+ Assert.assertNotNull(message);
+ }
+ boolean noMessage = false;
+ for (int i = 0; i < 10; i++) {
+ Message message = consumeFromConsumeTopic.receive(2,
TimeUnit.SECONDS);
+ if(message == null){
+ noMessage = true;
+ }
+ }
+ Assert.assertTrue(noMessage);
+ }
+
+
+ @Test
+ public void testProduceTxnMessage() throws InterruptedException,
PulsarClientException {
+ String argString = "%s -r 10 -u %s -m %d -txn";
+ String topic = testTopic + UUID.randomUUID();
+ int totalMessage = 500;
+ String args = String.format(argString, topic,
pulsar.getBrokerServiceUrl(), totalMessage);
+
+ Thread thread = new Thread(() -> {
+ try {
+ log.info("");
+ PerformanceProducer.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName("subName").topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .enableBatchIndexAcknowledgment(false)
+ .subscribe();
+ for (int i = 0; i < totalMessage; i++) {
+ Message message = consumer.receive();
+ Assert.assertNotNull(message);
+ }
+
+
+ }
+
+ @Test
+ public void testConsumeTxnMessage() throws InterruptedException,
PulsarClientException, ExecutionException {
+ String argString = "%s -r 10 -u %s -txn -ss %s -st %s -sp %s -ntxn %d";
+ String subName = "sub";
+ String topic = testTopic + UUID.randomUUID();
+ String args = String.format(argString, topic,
pulsar.getBrokerServiceUrl(), subName,
+ SubscriptionType.Exclusive,
SubscriptionInitialPosition.Earliest, 10);
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ for (int i = 0; i < 505; i++) {
+ producer.newMessage().send();
+ }
+ Thread thread = new Thread(() -> {
+ try {
+ log.info("");
+ PerformanceConsumer.main(args.split(" "));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ thread.join();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().subscriptionName(subName).topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .enableBatchIndexAcknowledgment(false)
+ .subscribe();
+ boolean noMessageAgain = false;
+ for (int i = 0; i < 6; i++) {
+ Message message = consumer.receive(2, TimeUnit.SECONDS);
Review comment:
we can receive 5 message and can't receive more than 5 messages.
##########
File path:
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -340,9 +344,123 @@ public static void main(String[] args) throws Exception {
if (isNotBlank(arguments.listenerName)) {
clientBuilder.listenerName(arguments.listenerName);
}
-
PulsarClient pulsarClient = clientBuilder.build();
+ AtomicReference<Transaction> atomicReference =
buildTransaction(pulsarClient, arguments.isEnableTransaction,
+ arguments.transactionTimeout);
+
+ AtomicLong messageAckedCount = new AtomicLong();
+ Semaphore messageReceiveLimiter = new
Semaphore(arguments.numMessagesPerTransaction);
+ Thread thread = Thread.currentThread();
+ MessageListener<ByteBuffer> listener = (consumer, msg) -> {
+ if(arguments.isEnableTransaction){
+ try {
+ messageReceiveLimiter.acquire();
+ }catch (InterruptedException e){
+ log.error("Got error: ", e);
+ }
+ }
+ if (arguments.testTime > 0) {
+ if (System.nanoTime() > testEndTime) {
+ log.info("------------------- DONE
-----------------------");
+ printAggregatedStats();
+ PerfClientUtils.exit(0);
+ thread.interrupt();
+ }
+ }
+ if(arguments.totalNumTxn > 0) {
+ if (totalEndTxnOpFailNum.sum() +
totalEndTxnOpSuccessNum.sum() >= arguments.totalNumTxn) {
+ log.info("------------------- DONE
-----------------------");
+ printAggregatedStats();
+ PerfClientUtils.exit(0);
+ thread.interrupt();
+ }
+ }
+ messagesReceived.increment();
+ bytesReceived.add(msg.size());
+
+ totalMessagesReceived.increment();
+ totalBytesReceived.add(msg.size());
+
+ if (limiter != null) {
+ limiter.acquire();
+ }
+
+ long latencyMillis = System.currentTimeMillis() -
msg.getPublishTime();
+ if (latencyMillis >= 0) {
+ recorder.recordValue(latencyMillis);
+ cumulativeRecorder.recordValue(latencyMillis);
+ }
+ if (arguments.isEnableTransaction) {
Review comment:
why use repeat arguments.isEnableTransaction
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]