This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 67fa1a45aa4 [cleanup][misc] Remove obsolete LoadSimulationClient and
LoadSimulationController (#25926)
67fa1a45aa4 is described below
commit 67fa1a45aa44042ea8a58978752669129df42280
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 4 01:59:08 2026 -0700
[cleanup][misc] Remove obsolete LoadSimulationClient and
LoadSimulationController (#25926)
---
.../testclient/CmdGenerateDocumentation.java | 2 -
.../pulsar/testclient/LoadSimulationClient.java | 368 -----------
.../testclient/LoadSimulationController.java | 711 ---------------------
.../pulsar/testclient/PulsarPerfTestTool.java | 2 -
4 files changed, 1083 deletions(-)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdGenerateDocumentation.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdGenerateDocumentation.java
index 41f002b80c8..0b47e2e95a4 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdGenerateDocumentation.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdGenerateDocumentation.java
@@ -55,8 +55,6 @@ public class CmdGenerateDocumentation extends CmdBase{
cmdClassMap.put("transaction", PerformanceTransaction.class);
cmdClassMap.put("read", PerformanceReader.class);
cmdClassMap.put("monitor-brokers", BrokerMonitor.class);
- cmdClassMap.put("simulation-client", LoadSimulationClient.class);
- cmdClassMap.put("simulation-controller",
LoadSimulationController.class);
cmdClassMap.put("websocket-producer", PerformanceClient.class);
cmdClassMap.put("managed-ledger", ManagedLedgerWriter.class);
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
deleted file mode 100644
index f5aeba623ed..00000000000
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.testclient;
-
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.re2j.Pattern;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import lombok.CustomLog;
-import org.apache.pulsar.cli.converters.picocli.ByteUnitToLongConverter;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SizeUnit;
-import picocli.CommandLine.Command;
-import picocli.CommandLine.Option;
-
-/**
- * LoadSimulationClient is used to simulate client load by maintaining
producers and consumers for topics. Instances of
- * this class are controlled across a network via LoadSimulationController.
- */
-@Command(name = "simulation-client",
- description = "Simulate client load by maintaining producers and
consumers for topics.")
-@CustomLog
-public class LoadSimulationClient extends CmdBase{
-
- // Values for command encodings.
- public static final byte CHANGE_COMMAND = 0;
- public static final byte STOP_COMMAND = 1;
- public static final byte TRADE_COMMAND = 2;
- public static final byte CHANGE_GROUP_COMMAND = 3;
- public static final byte STOP_GROUP_COMMAND = 4;
- public static final byte FIND_COMMAND = 5;
-
- private ExecutorService executor;
- // Map from a message size to a cached byte[] of that size.
- private final Map<Integer, byte[]> payloadCache;
-
- // Map from a full topic name to the TradeUnit created for that topic.
- private final Map<String, TradeUnit> topicsToTradeUnits;
-
- // Pulsar admin to create namespaces with.
- private PulsarAdmin admin;
-
- // Pulsar client to create producers and consumers with.
- private PulsarClient client;
-
- // A TradeUnit is a Consumer and Producer pair. The rate of message
- // consumption as well as size may be changed at
- // any time, and the TradeUnit may also be stopped.
- private static class TradeUnit {
- Future<Consumer<byte[]>> consumerFuture;
- final AtomicBoolean stop;
- final RateLimiter rateLimiter;
-
- // Creating a byte[] for every message is stressful for a client
- // machine, so in order to ensure that any
- // message size may be sent/changed while reducing object creation, the
- // byte[] is wrapped in an AtomicReference.
- final AtomicReference<byte[]> payload;
- final PulsarClient client;
- final String topic;
- final Map<Integer, byte[]> payloadCache;
-
- public TradeUnit(final TradeConfiguration tradeConf, final
PulsarClient client,
- final Map<Integer, byte[]> payloadCache) throws Exception {
- consumerFuture = client.newConsumer()
- .topic(tradeConf.topic)
- .subscriptionName("Subscriber-" + tradeConf.topic)
- .messageListener(ackListener)
- .subscribeAsync();
- this.payload = new AtomicReference<>();
- this.payloadCache = payloadCache;
- this.client = client;
- topic = tradeConf.topic;
-
- // Add a byte[] of the appropriate size if it is not already
present
- // in the cache.
- this.payload.set(payloadCache.computeIfAbsent(tradeConf.size,
byte[]::new));
- rateLimiter = RateLimiter.create(tradeConf.rate);
- stop = new AtomicBoolean(false);
- }
-
- // Change the message rate/size according to the given configuration.
- public void change(final TradeConfiguration tradeConf) {
- rateLimiter.setRate(tradeConf.rate);
- this.payload.set(payloadCache.computeIfAbsent(tradeConf.size,
byte[]::new));
- }
-
- // Attempt to create a Producer indefinitely. Useful for ensuring
- // messages continue to be sent after broker
- // restarts occur.
- private Producer<byte[]> getNewProducer() throws Exception {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- return client.newProducer()
- .topic(topic)
- .sendTimeout(0, TimeUnit.SECONDS)
- .create();
-
- } catch (Exception e) {
- Thread.sleep(10000);
- }
- }
- throw new InterruptedException();
- }
-
- private class MutableBoolean {
- public volatile boolean value = true;
- }
-
- public void start() throws Exception {
- Producer<byte[]> producer = getNewProducer();
- final Consumer<byte[]> consumer = consumerFuture.get();
- while (!stop.get()) {
- final MutableBoolean wellnessFlag = new MutableBoolean();
- final Function<Throwable, ? extends MessageId>
exceptionHandler = e -> {
- // Unset the well flag in the case of an exception so we
can
- // try to get a new Producer.
- wellnessFlag.value = false;
- if (PerfClientUtils.hasInterruptedException(e)) {
- Thread.currentThread().interrupt();
- }
- return null;
- };
- while (!stop.get() && wellnessFlag.value) {
-
producer.sendAsync(payload.get()).exceptionally(exceptionHandler);
- rateLimiter.acquire();
- }
- producer.closeAsync();
- if (!stop.get()) {
- // The Producer failed due to an exception: attempt to get
- // another producer.
- producer = getNewProducer();
- } else {
- // We are finished: close the consumer.
- consumer.closeAsync();
- }
- }
- }
- }
-
- // picocli arguments for starting a LoadSimulationClient.
-
- @Option(names = { "--port" }, description = "Port to listen on for
controller", required = true)
- public int port;
-
- @Option(names = { "--service-url" }, description = "Pulsar Service URL",
required = true)
- public String serviceURL;
-
- @Option(names = { "-ml", "--memory-limit", }, description = "Configure the
Pulsar client memory limit "
- + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class)
- public long memoryLimit = 0L;
-
-
- // Configuration class for initializing or modifying TradeUnits.
- private static class TradeConfiguration {
- public byte command;
- public String topic;
- public double rate;
- public int size;
- public String tenant;
- public String group;
-
- public TradeConfiguration() {
- command = -1;
- rate = 100;
- size = 1024;
- }
- }
-
- // Handle input sent from a controller.
- private void handle(final Socket socket) throws Exception {
- final DataInputStream inputStream = new
DataInputStream(socket.getInputStream());
- int command;
- while ((command = inputStream.read()) != -1) {
- handle((byte) command, inputStream, new
DataOutputStream(socket.getOutputStream()));
- }
- }
-
- // Decode TradeConfiguration fields common for topic creation and
- // modification.
- private void decodeProducerOptions(final TradeConfiguration tradeConf,
final DataInputStream inputStream)
- throws Exception {
- tradeConf.topic = inputStream.readUTF();
- tradeConf.size = inputStream.readInt();
- tradeConf.rate = inputStream.readDouble();
- }
-
- // Decode TradeConfiguration fields common for group commands.
- private void decodeGroupOptions(final TradeConfiguration tradeConf, final
DataInputStream inputStream)
- throws Exception {
- tradeConf.tenant = inputStream.readUTF();
- tradeConf.group = inputStream.readUTF();
- }
-
- // Handle a command sent from a controller.
- private void handle(final byte command, final DataInputStream inputStream,
final DataOutputStream outputStream)
- throws Exception {
- final TradeConfiguration tradeConf = new TradeConfiguration();
- tradeConf.command = command;
- switch (command) {
- case CHANGE_COMMAND:
- // Change the topic's settings if it exists.
- decodeProducerOptions(tradeConf, inputStream);
- if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
- topicsToTradeUnits.get(tradeConf.topic).change(tradeConf);
- }
- break;
- case STOP_COMMAND:
- // Stop the topic if it exists.
- tradeConf.topic = inputStream.readUTF();
- if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
- topicsToTradeUnits.get(tradeConf.topic).stop.set(true);
- }
- break;
- case TRADE_COMMAND:
- // Create the topic. It is assumed that the topic does not already
exist.
- decodeProducerOptions(tradeConf, inputStream);
- final TradeUnit tradeUnit = new TradeUnit(tradeConf, client,
payloadCache);
- topicsToTradeUnits.put(tradeConf.topic, tradeUnit);
- executor.submit(() -> {
- try {
- final String topic = tradeConf.topic;
- final String namespace =
topic.substring("persistent://".length(), topic.lastIndexOf('/'));
- try {
- admin.namespaces().createNamespace(namespace);
- } catch (PulsarAdminException.ConflictException e) {
- // Ignore, already created namespace.
- }
- tradeUnit.start();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- });
- break;
- case CHANGE_GROUP_COMMAND:
- // Change the settings of all topics belonging to a group.
- decodeGroupOptions(tradeConf, inputStream);
- tradeConf.size = inputStream.readInt();
- tradeConf.rate = inputStream.readDouble();
- // See if a topic belongs to this tenant and group using this
regex.
- final Pattern groupRegex =
- Pattern.compile(".*://" + tradeConf.tenant + "/.*/" +
tradeConf.group + "-.*/.*");
-
- for (Map.Entry<String, TradeUnit> entry :
topicsToTradeUnits.entrySet()) {
- final String topic = entry.getKey();
- final TradeUnit unit = entry.getValue();
-
- if (groupRegex.matcher(topic).matches()) {
- unit.change(tradeConf);
- }
- }
- break;
- case STOP_GROUP_COMMAND:
- // Stop all topics belonging to a group.
- decodeGroupOptions(tradeConf, inputStream);
- // See if a topic belongs to this tenant and group using this
regex.
- final Pattern regex = Pattern.compile(".*://" + tradeConf.tenant +
"/.*/" + tradeConf.group + "-.*/.*");
- for (Map.Entry<String, TradeUnit> entry :
topicsToTradeUnits.entrySet()) {
- final String topic = entry.getKey();
- final TradeUnit unit = entry.getValue();
- if (regex.matcher(topic).matches()) {
- unit.stop.set(true);
- }
- }
- break;
- case FIND_COMMAND:
- // Write a single boolean indicating if the topic was found.
-
outputStream.writeBoolean(topicsToTradeUnits.containsKey(inputStream.readUTF()));
- outputStream.flush();
- break;
- default:
- throw new IllegalArgumentException("Unrecognized command code
received: " + command);
- }
- }
-
- // Make listener as lightweight as possible.
- private static final MessageListener<byte[]> ackListener =
Consumer::acknowledgeAsync;
-
- /**
- * Create a LoadSimulationClient with the given picocli this.
- *
- */
- public LoadSimulationClient() throws PulsarClientException {
- super("simulation-client");
- payloadCache = new ConcurrentHashMap<>();
- topicsToTradeUnits = new ConcurrentHashMap<>();
- }
-
- /**
- * Start a client with command line this.
- *
- */
- @Override
- @SuppressWarnings("deprecation")
- public void run() throws Exception {
- admin = PulsarAdmin.builder()
- .serviceHttpUrl(this.serviceURL)
- .build();
- client = PulsarClient.builder()
- .memoryLimit(this.memoryLimit, SizeUnit.BYTES)
- .serviceUrl(this.serviceURL)
- .connectionsPerBroker(4)
- .ioThreads(Runtime.getRuntime().availableProcessors())
- .statsInterval(0, TimeUnit.SECONDS)
- .build();
- executor = Executors.newCachedThreadPool(new
DefaultThreadFactory("test-client"));
- PerfClientUtils.printJVMInformation(log);
- this.start();
- }
-
- /**
- * Start listening for controller commands to create producers and
consumers.
- */
- public void start() throws Exception {
- final ServerSocket serverSocket = new ServerSocket(port);
-
- while (!Thread.currentThread().isInterrupted()) {
- // Technically, two controllers can be connected simultaneously,
but
- // non-sequential handling of commands
- // has not been tested or considered and is not recommended.
- log.info("Listening for controller command...");
- final Socket socket = serverSocket.accept();
- log.info().attr("connected",
socket.getInetAddress().getHostName()).log("Connected to");
- executor.submit(() -> {
- try {
- handle(socket);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- });
- }
- }
-}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
deleted file mode 100644
index 89ee07f46cd..00000000000
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.testclient;
-
-import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
-import static
org.apache.pulsar.broker.resources.LoadBalanceResources.RESOURCE_QUOTA_BASE_PATH;
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.net.Socket;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import lombok.CustomLog;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.broker.loadbalance.LoadManager;
-import org.apache.pulsar.common.policies.data.ResourceQuota;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.policies.data.loadbalancer.BundleData;
-import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
-import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import picocli.CommandLine;
-import picocli.CommandLine.Command;
-import picocli.CommandLine.Option;
-import picocli.CommandLine.Parameters;
-
-/**
- * This class provides a shell for the user to dictate how simulation clients
should incur load.
- */
-@Command(name = "simulation-controller",
- description = "Provides a shell for the user to dictate how simulation
clients should "
- + "incur load.")
-@CustomLog
-public class LoadSimulationController extends CmdBase{
-
- // Input streams for each client to send commands through.
- private DataInputStream[] inputStreams;
-
- // Output streams for each client to receive information from.
- private DataOutputStream[] outputStreams;
-
- // client host names.
- private String[] clients;
-
- private Random random;
-
- private static final ExecutorService threadPool =
Executors.newCachedThreadPool();
-
- // picocli arguments for starting a controller via main.
-
- @Option(names = { "--cluster" }, description = "Cluster to test on",
required = true)
- String cluster;
-
- @Option(names = { "--clients" }, description = "Comma separated list of
client hostnames", required = true)
- String clientHostNames;
-
- @Option(names = { "--client-port" }, description = "Port that the clients
are listening on", required = true)
- int clientPort;
-
-
- // picocli arguments for accepting user input.
- private static class ShellArguments {
- @Parameters(description = "Command arguments:\n" + "trade tenant
namespace topic\n"
- + "change tenant namespace topic\n" + "stop tenant namespace
topic\n"
- + "trade_group tenant group_name num_namespaces\n" +
"change_group tenant group_name\n"
- + "stop_group tenant group_name\n" + "script script_name\n" +
"copy tenant_name source_zk target_zk\n"
- + "stream source_zk\n" + "simulate zk\n", arity = "1")
- List<String> commandArguments;
-
- @Option(names = { "--rand-rate" }, description = "Choose message rate
uniformly randomly from the next two "
- + "comma separated values (overrides --rate)")
- String rangeString = "";
-
- @Option(names = { "--rate" }, description = "Messages per second")
- double rate = 1;
-
- @Option(names = { "--rate-multiplier" }, description = "Multiplier to
use for copying or streaming rates")
- double rateMultiplier = 1;
-
- @Option(names = { "--separation" }, description = "Separation time in
ms for trade_group actions "
- + "(0 for no separation)")
- int separation = 0;
-
- @Option(names = { "--size" }, description = "Message size in bytes")
- int size = 1024;
-
- @Option(names = { "--topics-per-namespace" }, description = "Number of
topics to create per namespace in "
- + "trade_group (total number of topics is num_namespaces X
num_topics)")
- int topicsPerNamespace = 1;
- }
-
- // In stream mode, the BrokerWatcher watches the /loadbalance/broker zpath
and adds LoadReportWatchers accordingly
- // when new brokers come up.
- private class BrokerWatcher implements Watcher {
- private final ZooKeeper zkClient;
-
- // Currently observed brokers.
- private final Set<String> brokers;
-
- // Shell arguments to configure streaming with.
- private final ShellArguments arguments;
-
- private BrokerWatcher(final ZooKeeper zkClient, final ShellArguments
arguments) {
- this.zkClient = zkClient;
- this.arguments = arguments;
- brokers = new HashSet<>();
- // Observe the currently active brokers and put a watch on the
broker root.
- process(null);
- }
-
- // Add load report watchers for newly observed brokers.
- public synchronized void process(final WatchedEvent event) {
- try {
- final List<String> currentBrokers =
zkClient.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT, this);
- for (final String broker : currentBrokers) {
- if (!brokers.contains(broker)) {
- new LoadReportWatcher(String.format("%s/%s",
LoadManager.LOADBALANCE_BROKERS_ROOT, broker),
- zkClient, arguments);
- brokers.add(broker);
- }
- }
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- }
-
- // In stream mode, the LoadReportWatcher watches the /loadbalance/broker
children and adds or modifies topics with
- // suitable rates based on the most recent message rate and throughput
information.
- private class LoadReportWatcher implements Watcher {
- private final ZooKeeper zkClient;
- private final String path;
- private final ShellArguments arguments;
-
- public LoadReportWatcher(final String path, final ZooKeeper zkClient,
final ShellArguments arguments) {
- this.path = path;
- this.zkClient = zkClient;
- this.arguments = arguments;
- // Get initial topics and set this up as a watch by calling
process.
- process(null);
- }
-
- // Update the message rate information for the bundles in a recently
changed load report.
- public synchronized void process(final WatchedEvent event) {
- try {
- // Get the load report and put this back as a watch.
- final LoadReport loadReport =
ObjectMapperFactory.getMapper().getObjectMapper()
- .readValue(zkClient.getData(path, this, null),
LoadReport.class);
- for (final Map.Entry<String, NamespaceBundleStats> entry :
loadReport.getBundleStats().entrySet()) {
- final String bundle = entry.getKey();
- final String namespace = bundle.substring(0,
bundle.lastIndexOf('/'));
- final String topic = String.format("%s/%s", namespace,
"t");
- final NamespaceBundleStats stats = entry.getValue();
-
- // Approximate total message rate via average between
in/out.
- final double messageRate = arguments.rateMultiplier *
(stats.msgRateIn + stats.msgRateOut) / 2;
-
- // size = throughput / rate.
- final int messageSize = (int)
Math.ceil(arguments.rateMultiplier
- * (stats.msgThroughputIn + stats.msgThroughputOut)
/ (2 * messageRate));
-
- arguments.rate = messageRate;
- arguments.size = messageSize;
- // Try to modify the topic if it already exists.
Otherwise, create it.
- changeOrCreate(arguments, topic);
- }
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- }
-
- /**
- * Create a LoadSimulationController with the given picocli arguments.
- *
- */
- public LoadSimulationController() throws Exception {
- super("simulation-controller");
- }
-
- // Check that the expected number of application arguments matches the
- // actual number of application arguments.
- private boolean checkAppArgs(final int numAppArgs, final int numRequired) {
- if (numAppArgs != numRequired) {
- log.info()
- .attr("found", numAppArgs)
- .attr("required", numRequired)
- .log("ERROR: Wrong number of application arguments (found
, required");
- return false;
- }
- return true;
- }
-
- // Recursively acquire all resource quotas by getting the ZK children of
the given path and calling this function
- // on the children if there are any, or getting the data from this ZNode
otherwise.
- private void getResourceQuotas(final String path, final ZooKeeper zkClient,
- final Map<String, ResourceQuota>[] threadLocalMaps) throws
Exception {
- final List<String> children = zkClient.getChildren(path, false);
- if (children.isEmpty()) {
- threadLocalMaps[random.nextInt(clients.length)].put(path,
ObjectMapperFactory.getMapper().getObjectMapper()
- .readValue(zkClient.getData(path, false, null),
ResourceQuota.class));
- } else {
- for (final String child : children) {
- getResourceQuotas(String.format("%s/%s", path, child),
zkClient, threadLocalMaps);
- }
- }
- }
-
- // Initialize a BundleData from a resource quota and configurations and
modify the quota accordingly.
- private BundleData initializeBundleData(final ResourceQuota quota, final
ShellArguments arguments) {
- final double messageRate = (quota.getMsgRateIn() +
quota.getMsgRateOut()) / 2;
- final int messageSize = (int) Math.ceil((quota.getBandwidthIn() +
quota.getBandwidthOut()) / (2 * messageRate));
- arguments.rate = messageRate * arguments.rateMultiplier;
- arguments.size = messageSize;
- final NamespaceBundleStats startingStats = new NamespaceBundleStats();
-
- // Modify the original quota so that new rates are set.
- final double modifiedRate = messageRate * arguments.rateMultiplier;
- final double modifiedBandwidth = (quota.getBandwidthIn() +
quota.getBandwidthOut()) * arguments.rateMultiplier
- / 2;
- quota.setMsgRateIn(modifiedRate);
- quota.setMsgRateOut(modifiedRate);
- quota.setBandwidthIn(modifiedBandwidth);
- quota.setBandwidthOut(modifiedBandwidth);
-
- // Assume modified memory usage is comparable to the rate multiplier
times the original usage.
- quota.setMemory(quota.getMemory() * arguments.rateMultiplier);
- startingStats.msgRateIn = quota.getMsgRateIn();
- startingStats.msgRateOut = quota.getMsgRateOut();
- startingStats.msgThroughputIn = quota.getBandwidthIn();
- startingStats.msgThroughputOut = quota.getBandwidthOut();
- final BundleData bundleData = new BundleData(10, 1000, startingStats);
- // Assume there is ample history for the bundle.
- bundleData.getLongTermData().setNumSamples(1000);
- bundleData.getShortTermData().setNumSamples(10);
- return bundleData;
- }
-
- // Makes a topic string from a tenant name, namespace name, and topic
- // name.
- private String makeTopic(final String tenant, final String namespace,
final String topic) {
- return String.format("persistent://%s/%s/%s/%s", tenant, cluster,
namespace, topic);
- }
-
- // Write options that are common to modifying and creating topics.
- private void writeProducerOptions(final DataOutputStream outputStream,
final ShellArguments arguments,
- final String topic) throws Exception {
- if (!arguments.rangeString.isEmpty()) {
- // If --rand-rate was specified, extract the bounds by splitting on
- // the comma and parsing the resulting
- // doubles.
- final String[] splits = arguments.rangeString.split(",");
- if (splits.length != 2) {
- log.error("Argument to --rand-rate should be two
comma-separated values");
- return;
- }
- final double first = Double.parseDouble(splits[0]);
- final double second = Double.parseDouble(splits[1]);
- final double min = Math.min(first, second);
- final double max = Math.max(first, second);
- arguments.rate = random.nextDouble() * (max - min) + min;
- }
- outputStream.writeUTF(topic);
- outputStream.writeInt(arguments.size);
- outputStream.writeDouble(arguments.rate);
- }
-
- // Change producer settings for a given topic and picocli arguments.
- private void change(final ShellArguments arguments, final String topic,
final int client) throws Exception {
- outputStreams[client].write(LoadSimulationClient.CHANGE_COMMAND);
- writeProducerOptions(outputStreams[client], arguments, topic);
- outputStreams[client].flush();
- }
-
- // Change an existing topic, or create it if it does not exist.
- private int changeOrCreate(final ShellArguments arguments, final String
topic) throws Exception {
- final int client = find(topic);
- if (client == -1) {
- trade(arguments, topic, random.nextInt(clients.length));
- } else {
- change(arguments, topic, client);
- }
- return client;
- }
-
- // Find a topic and change it if it exists.
- private int changeIfExists(final ShellArguments arguments, final String
topic) throws Exception {
- final int client = find(topic);
- if (client != -1) {
- change(arguments, topic, client);
- }
- return client;
- }
-
- // Attempt to find a topic on the clients.
- private int find(final String topic) throws Exception {
- int clientWithTopic = -1;
- for (int i = 0; i < clients.length; ++i) {
- outputStreams[i].write(LoadSimulationClient.FIND_COMMAND);
- outputStreams[i].writeUTF(topic);
- }
- for (int i = 0; i < clients.length; ++i) {
- if (inputStreams[i].readBoolean()) {
- clientWithTopic = i;
- }
- }
- return clientWithTopic;
- }
-
- // Trade using the arguments parsed via picocli and the topic name.
- private synchronized void trade(final ShellArguments arguments, final
String topic, final int client)
- throws Exception {
- // Decide which client to send to randomly to preserve statelessness of
- // the controller.
- outputStreams[client].write(LoadSimulationClient.TRADE_COMMAND);
- writeProducerOptions(outputStreams[client], arguments, topic);
- outputStreams[client].flush();
- }
-
- // Handle the command line arguments associated with the change command.
- private void handleChange(final ShellArguments arguments) throws Exception
{
- final List<String> commandArguments = arguments.commandArguments;
- // Change expects three application arguments: tenant name, namespace
name, and topic name.
- if (checkAppArgs(commandArguments.size() - 1, 3)) {
- final String topic = makeTopic(commandArguments.get(1),
commandArguments.get(2),
- commandArguments.get(3));
- if (changeIfExists(arguments, topic) == -1) {
- log.info().attr("topic", topic).log("Topic not found");
- }
- }
- }
-
- // Handle the command line arguments associated with the copy command.
- @SuppressWarnings("unchecked")
- private void handleCopy(final ShellArguments arguments) throws Exception {
- final List<String> commandArguments = arguments.commandArguments;
- // Copy accepts 3 application arguments: Tenant name, source ZooKeeper
and target ZooKeeper connect strings.
- if (checkAppArgs(commandArguments.size() - 1, 3)) {
- final String tenantName = commandArguments.get(1);
- final String sourceZKConnectString = commandArguments.get(2);
- final String targetZKConnectString = commandArguments.get(3);
- final ZooKeeper sourceZKClient = new
ZooKeeper(sourceZKConnectString, 5000, null);
- final ZooKeeper targetZKClient = new
ZooKeeper(targetZKConnectString, 5000, null);
- // Make a map for each thread to speed up the ZooKeeper writing
process.
- final Map<String, ResourceQuota>[] threadLocalMaps = new
Map[clients.length];
- for (int i = 0; i < clients.length; ++i) {
- threadLocalMaps[i] = new HashMap<>();
- }
- getResourceQuotas(RESOURCE_QUOTA_BASE_PATH, sourceZKClient,
threadLocalMaps);
- final List<Future> futures = new ArrayList<>(clients.length);
- int i = 0;
- log.info("Copying...");
- for (final Map<String, ResourceQuota> bundleToQuota :
threadLocalMaps) {
- final int j = i;
- futures.add(threadPool.submit(() -> {
- for (final Map.Entry<String, ResourceQuota> entry :
bundleToQuota.entrySet()) {
- final String bundle = entry.getKey();
- final ResourceQuota quota = entry.getValue();
- // Simulation will send messages in and out at about
the same rate, so just make the rate the
- // average of in and out.
-
- final int tenantStart =
RESOURCE_QUOTA_BASE_PATH.length() + 1;
- final int clusterStart = bundle.indexOf('/',
tenantStart) + 1;
- final String sourceTenant =
bundle.substring(tenantStart, clusterStart - 1);
- final int namespaceStart = bundle.indexOf('/',
clusterStart) + 1;
- final String sourceCluster =
bundle.substring(clusterStart, namespaceStart - 1);
- final String namespace =
bundle.substring(namespaceStart, bundle.lastIndexOf('/'));
- final String keyRangeString =
bundle.substring(bundle.lastIndexOf('/') + 1);
- // To prevent duplicate node issues for same namespace
names in different clusters/tenants.
- final String manglePrefix = String.format("%s-%s-%s",
sourceCluster, sourceTenant,
- keyRangeString);
- final String mangledNamespace = String.format("%s-%s",
manglePrefix, namespace);
- final BundleData bundleData =
initializeBundleData(quota, arguments);
- final String oldAPITargetPath = String.format(
- "%s/namespace/%s/%s/%s/0x00000000_0xffffffff",
BUNDLE_DATA_BASE_PATH, tenantName,
- cluster, mangledNamespace);
- final String newAPITargetPath = String.format(
- "%s/%s/%s/%s/0x00000000_0xffffffff",
BUNDLE_DATA_BASE_PATH, tenantName, cluster,
- mangledNamespace);
- try {
- ZkUtils.createFullPathOptimistic(targetZKClient,
oldAPITargetPath,
-
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(quota),
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException e) {
- // Ignore already created nodes.
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- // Put the bundle data in the new ZooKeeper.
- try {
- ZkUtils.createFullPathOptimistic(targetZKClient,
newAPITargetPath,
-
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(bundleData),
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException e) {
- // Ignore already created nodes.
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- try {
- trade(arguments, makeTopic(tenantName,
mangledNamespace, "t"), j);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }));
- ++i;
- }
- for (final Future future : futures) {
- future.get();
- }
- sourceZKClient.close();
- targetZKClient.close();
- }
- }
-
- // Handle the command line arguments associated with the simulate command.
- @SuppressWarnings("unchecked")
- private void handleSimulate(final ShellArguments arguments) throws
Exception {
- final List<String> commandArguments = arguments.commandArguments;
- checkAppArgs(commandArguments.size() - 1, 1);
- final ZooKeeper zkClient = new ZooKeeper(commandArguments.get(1),
5000, null);
- // Make a map for each thread to speed up the ZooKeeper writing
process.
- final Map<String, ResourceQuota>[] threadLocalMaps = new
Map[clients.length];
- for (int i = 0; i < clients.length; ++i) {
- threadLocalMaps[i] = new HashMap<>();
- }
- getResourceQuotas(RESOURCE_QUOTA_BASE_PATH, zkClient, threadLocalMaps);
- final List<Future> futures = new ArrayList<>(clients.length);
- int i = 0;
- log.info("Simulating...");
- for (final Map<String, ResourceQuota> bundleToQuota : threadLocalMaps)
{
- final int j = i;
- futures.add(threadPool.submit(() -> {
- for (final Map.Entry<String, ResourceQuota> entry :
bundleToQuota.entrySet()) {
- final String bundle = entry.getKey();
- final String newAPIPath =
bundle.replace(RESOURCE_QUOTA_BASE_PATH, BUNDLE_DATA_BASE_PATH);
- final ResourceQuota quota = entry.getValue();
- final int tenantStart = RESOURCE_QUOTA_BASE_PATH.length()
+ 1;
- final String topic = String.format("persistent://%s/t",
bundle.substring(tenantStart));
- final BundleData bundleData = initializeBundleData(quota,
arguments);
- // Put the bundle data in the new ZooKeeper.
- try {
- ZkUtils.createFullPathOptimistic(zkClient, newAPIPath,
-
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(bundleData),
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException e) {
- try {
- zkClient.setData(newAPIPath,
-
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(bundleData), -1);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- try {
- trade(arguments, topic, j);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }));
- ++i;
- }
- for (final Future future : futures) {
- future.get();
- }
- zkClient.close();
- }
-
- // Handle the command line arguments associated with the stop command.
- private void handleStop(final ShellArguments arguments) throws Exception {
- final List<String> commandArguments = arguments.commandArguments;
- // Stop expects three application arguments: tenant name, namespace
- // name, and topic name.
- if (checkAppArgs(commandArguments.size() - 1, 3)) {
- final String topic = makeTopic(commandArguments.get(1),
commandArguments.get(2),
- commandArguments.get(3));
- for (DataOutputStream outputStream : outputStreams) {
- outputStream.write(LoadSimulationClient.STOP_COMMAND);
- outputStream.writeUTF(topic);
- outputStream.flush();
- }
- }
- }
-
- // Handle the command line arguments associated with the stream command.
- private void handleStream(final ShellArguments arguments) throws Exception
{
- final List<String> commandArguments = arguments.commandArguments;
- // Stream accepts 1 application argument: ZooKeeper connect string.
- if (checkAppArgs(commandArguments.size() - 1, 1)) {
- final String zkConnectString = commandArguments.get(1);
- final ZooKeeper zkClient = new ZooKeeper(zkConnectString, 5000,
null);
- new BrokerWatcher(zkClient, arguments);
- // This controller will now stream rate changes from the given ZK.
- // Users wishing to stop this should Ctrl + C and use another
- // Controller to send new commands.
- Thread.currentThread().join();
- }
- }
-
- // Handle the command line arguments associated with the trade command.
- private void handleTrade(final ShellArguments arguments) throws Exception {
- final List<String> commandArguments = arguments.commandArguments;
- // Trade expects three application arguments: tenant, namespace, and
- // topic.
- if (checkAppArgs(commandArguments.size() - 1, 3)) {
- final String topic = makeTopic(commandArguments.get(1),
commandArguments.get(2),
- commandArguments.get(3));
- trade(arguments, topic, random.nextInt(clients.length));
- }
- }
-
- // Handle the command line arguments associated with the group change
command.
- private void handleGroupChange(final ShellArguments arguments) throws
Exception {
- final List<String> commandArguments = arguments.commandArguments;
- // Group change expects two application arguments: tenant name and
group
- // name.
- if (checkAppArgs(commandArguments.size() - 1, 2)) {
- final String tenant = commandArguments.get(1);
- final String group = commandArguments.get(2);
- for (DataOutputStream outputStream : outputStreams) {
- outputStream.write(LoadSimulationClient.CHANGE_GROUP_COMMAND);
- outputStream.writeUTF(tenant);
- outputStream.writeUTF(group);
- outputStream.writeInt(arguments.size);
- outputStream.writeDouble(arguments.rate);
- outputStream.flush();
- }
- }
- }
-
- // Handle the command line arguments associated with the group stop
command.
- private void handleGroupStop(final ShellArguments arguments) throws
Exception {
- final List<String> commandArguments = arguments.commandArguments;
- // Group stop requires two application arguments: tenant name and group
- // name.
- if (checkAppArgs(commandArguments.size() - 1, 2)) {
- final String tenant = commandArguments.get(1);
- final String group = commandArguments.get(2);
- for (DataOutputStream outputStream : outputStreams) {
- outputStream.write(LoadSimulationClient.STOP_GROUP_COMMAND);
- outputStream.writeUTF(tenant);
- outputStream.writeUTF(group);
- outputStream.flush();
- }
- }
- }
-
- // Handle the command line arguments associated with the group trade
command.
- private void handleGroupTrade(final ShellArguments arguments) throws
Exception {
- final List<String> commandArguments = arguments.commandArguments;
- // Group trade expects 3 application arguments: tenant name, group
name,
- // and number of namespaces.
- if (checkAppArgs(commandArguments.size() - 1, 3)) {
- final String tenant = commandArguments.get(1);
- final String group = commandArguments.get(2);
- final int numNamespaces =
Integer.parseInt(commandArguments.get(3));
- for (int i = 0; i < numNamespaces; ++i) {
- for (int j = 0; j < arguments.topicsPerNamespace; ++j) {
- // For each namespace and topic pair, create the namespace
- // by using the group name and the
- // namespace index, and then create the topic by using the
- // topic index. Then just call trade.
- final String topic = makeTopic(tenant,
String.format("%s-%d", group, i),
- Integer.toString(j));
- trade(arguments, topic, random.nextInt(clients.length));
- Thread.sleep(arguments.separation);
- }
- }
- }
- }
-
- /**
- * Read the user-submitted arguments as commands to send to clients.
- *
- * @param args
- * Arguments split on whitespace from user input.
- */
- private void read(final String[] args) {
- // Don't attempt to process blank input.
- if (args.length > 0 && !(args.length == 1 && args[0].isEmpty())) {
- final ShellArguments arguments = new ShellArguments();
- final CommandLine commander = new CommandLine(arguments);
- try {
- commander.parseArgs(args);
- final String command = arguments.commandArguments.get(0);
- switch (command) {
- case "trade":
- handleTrade(arguments);
- break;
- case "change":
- handleChange(arguments);
- break;
- case "stop":
- handleStop(arguments);
- break;
- case "trade_group":
- handleGroupTrade(arguments);
- break;
- case "change_group":
- handleGroupChange(arguments);
- break;
- case "stop_group":
- handleGroupStop(arguments);
- break;
- case "script":
- // Read input from the given script instead of stdin until
- // the script has executed completely.
- final List<String> commandArguments =
arguments.commandArguments;
- checkAppArgs(commandArguments.size() - 1, 1);
- final String scriptName = commandArguments.get(1);
- try (BufferedReader scriptReader = new BufferedReader(
- new InputStreamReader(new
FileInputStream(Paths.get(scriptName).toFile())))) {
- String line;
- while ((line = scriptReader.readLine()) != null) {
- read(line.split("\\s+"));
- }
- }
- break;
- case "copy":
- handleCopy(arguments);
- break;
- case "stream":
- handleStream(arguments);
- break;
- case "simulate":
- handleSimulate(arguments);
- break;
- case "quit":
- case "exit":
- PerfClientUtils.exit(0);
- break;
- default:
- log.info().attr("command", command).log("ERROR: Unknown
command \" \"");
- }
- } catch (ParameterException ex) {
- System.out.println(ex.getMessage());
- commander.usage(commander.getOut());
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- }
-
- /**
- * Create a shell for the user to send commands to clients.
- */
- public void start() throws Exception {
- BufferedReader inReader = new BufferedReader(new
InputStreamReader(System.in));
- while (!Thread.currentThread().isInterrupted()) {
- // Print the very simple prompt.
- System.out.println();
- System.out.print("> ");
- read(inReader.readLine().split("\\s+"));
- }
- }
-
- /**
- * Start a controller with command line arguments.
- *
- */
- @Override
- public void run() throws Exception {
- random = new Random();
- clients = this.clientHostNames.split(",");
- final Socket[] sockets = new Socket[clients.length];
- inputStreams = new DataInputStream[clients.length];
- outputStreams = new DataOutputStream[clients.length];
- log.info().attr("found", clients.length).log("Found clients");
- for (int i = 0; i < clients.length; ++i) {
- sockets[i] = new Socket(clients[i], clientPort);
- inputStreams[i] = new DataInputStream(sockets[i].getInputStream());
- outputStreams[i] = new
DataOutputStream(sockets[i].getOutputStream());
- log.info().attr("connected", clients[i]).log("Connected to");
- }
- start();
- }
-}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PulsarPerfTestTool.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PulsarPerfTestTool.java
index 826060dc6b7..d187ba97f37 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PulsarPerfTestTool.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PulsarPerfTestTool.java
@@ -50,8 +50,6 @@ public class PulsarPerfTestTool {
commandMap.put("transaction", PerformanceTransaction.class);
commandMap.put("read", PerformanceReader.class);
commandMap.put("monitor-brokers", BrokerMonitor.class);
- commandMap.put("simulation-client", LoadSimulationClient.class);
- commandMap.put("simulation-controller",
LoadSimulationController.class);
commandMap.put("websocket-producer", PerformanceClient.class);
commandMap.put("managed-ledger", ManagedLedgerWriter.class);
commandMap.put("gen-doc", CmdGenerateDocumentation.class);