This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 67fa1a45aa4 [cleanup][misc] Remove obsolete LoadSimulationClient and 
LoadSimulationController (#25926)
67fa1a45aa4 is described below

commit 67fa1a45aa44042ea8a58978752669129df42280
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 4 01:59:08 2026 -0700

    [cleanup][misc] Remove obsolete LoadSimulationClient and 
LoadSimulationController (#25926)
---
 .../testclient/CmdGenerateDocumentation.java       |   2 -
 .../pulsar/testclient/LoadSimulationClient.java    | 368 -----------
 .../testclient/LoadSimulationController.java       | 711 ---------------------
 .../pulsar/testclient/PulsarPerfTestTool.java      |   2 -
 4 files changed, 1083 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdGenerateDocumentation.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdGenerateDocumentation.java
index 41f002b80c8..0b47e2e95a4 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdGenerateDocumentation.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdGenerateDocumentation.java
@@ -55,8 +55,6 @@ public class CmdGenerateDocumentation extends CmdBase{
         cmdClassMap.put("transaction", PerformanceTransaction.class);
         cmdClassMap.put("read", PerformanceReader.class);
         cmdClassMap.put("monitor-brokers", BrokerMonitor.class);
-        cmdClassMap.put("simulation-client", LoadSimulationClient.class);
-        cmdClassMap.put("simulation-controller", 
LoadSimulationController.class);
         cmdClassMap.put("websocket-producer", PerformanceClient.class);
         cmdClassMap.put("managed-ledger", ManagedLedgerWriter.class);
 
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
deleted file mode 100644
index f5aeba623ed..00000000000
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.testclient;
-
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.re2j.Pattern;
-import io.netty.util.concurrent.DefaultThreadFactory;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import lombok.CustomLog;
-import org.apache.pulsar.cli.converters.picocli.ByteUnitToLongConverter;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SizeUnit;
-import picocli.CommandLine.Command;
-import picocli.CommandLine.Option;
-
-/**
- * LoadSimulationClient is used to simulate client load by maintaining 
producers and consumers for topics. Instances of
- * this class are controlled across a network via LoadSimulationController.
- */
-@Command(name = "simulation-client",
-        description = "Simulate client load by maintaining producers and 
consumers for topics.")
-@CustomLog
-public class LoadSimulationClient extends CmdBase{
-
-    // Values for command encodings.
-    public static final byte CHANGE_COMMAND = 0;
-    public static final byte STOP_COMMAND = 1;
-    public static final byte TRADE_COMMAND = 2;
-    public static final byte CHANGE_GROUP_COMMAND = 3;
-    public static final byte STOP_GROUP_COMMAND = 4;
-    public static final byte FIND_COMMAND = 5;
-
-    private ExecutorService executor;
-    // Map from a message size to a cached byte[] of that size.
-    private final Map<Integer, byte[]> payloadCache;
-
-    // Map from a full topic name to the TradeUnit created for that topic.
-    private final Map<String, TradeUnit> topicsToTradeUnits;
-
-    // Pulsar admin to create namespaces with.
-    private PulsarAdmin admin;
-
-    // Pulsar client to create producers and consumers with.
-    private PulsarClient client;
-
-    // A TradeUnit is a Consumer and Producer pair. The rate of message
-    // consumption as well as size may be changed at
-    // any time, and the TradeUnit may also be stopped.
-    private static class TradeUnit {
-        Future<Consumer<byte[]>> consumerFuture;
-        final AtomicBoolean stop;
-        final RateLimiter rateLimiter;
-
-        // Creating a byte[] for every message is stressful for a client
-        // machine, so in order to ensure that any
-        // message size may be sent/changed while reducing object creation, the
-        // byte[] is wrapped in an AtomicReference.
-        final AtomicReference<byte[]> payload;
-        final PulsarClient client;
-        final String topic;
-        final Map<Integer, byte[]> payloadCache;
-
-        public TradeUnit(final TradeConfiguration tradeConf, final 
PulsarClient client,
-                final Map<Integer, byte[]> payloadCache) throws Exception {
-            consumerFuture = client.newConsumer()
-                    .topic(tradeConf.topic)
-                    .subscriptionName("Subscriber-" + tradeConf.topic)
-                    .messageListener(ackListener)
-                    .subscribeAsync();
-            this.payload = new AtomicReference<>();
-            this.payloadCache = payloadCache;
-            this.client = client;
-            topic = tradeConf.topic;
-
-            // Add a byte[] of the appropriate size if it is not already 
present
-            // in the cache.
-            this.payload.set(payloadCache.computeIfAbsent(tradeConf.size, 
byte[]::new));
-            rateLimiter = RateLimiter.create(tradeConf.rate);
-            stop = new AtomicBoolean(false);
-        }
-
-        // Change the message rate/size according to the given configuration.
-        public void change(final TradeConfiguration tradeConf) {
-            rateLimiter.setRate(tradeConf.rate);
-            this.payload.set(payloadCache.computeIfAbsent(tradeConf.size, 
byte[]::new));
-        }
-
-        // Attempt to create a Producer indefinitely. Useful for ensuring
-        // messages continue to be sent after broker
-        // restarts occur.
-        private Producer<byte[]> getNewProducer() throws Exception {
-            while (!Thread.currentThread().isInterrupted()) {
-                try {
-                    return client.newProducer()
-                                .topic(topic)
-                                .sendTimeout(0, TimeUnit.SECONDS)
-                                .create();
-
-                } catch (Exception e) {
-                    Thread.sleep(10000);
-                }
-            }
-            throw new InterruptedException();
-        }
-
-        private class MutableBoolean {
-            public volatile boolean value = true;
-        }
-
-        public void start() throws Exception {
-            Producer<byte[]> producer = getNewProducer();
-            final Consumer<byte[]> consumer = consumerFuture.get();
-            while (!stop.get()) {
-                final MutableBoolean wellnessFlag = new MutableBoolean();
-                final Function<Throwable, ? extends MessageId> 
exceptionHandler = e -> {
-                    // Unset the well flag in the case of an exception so we 
can
-                    // try to get a new Producer.
-                    wellnessFlag.value = false;
-                    if (PerfClientUtils.hasInterruptedException(e)) {
-                        Thread.currentThread().interrupt();
-                    }
-                    return null;
-                };
-                while (!stop.get() && wellnessFlag.value) {
-                    
producer.sendAsync(payload.get()).exceptionally(exceptionHandler);
-                    rateLimiter.acquire();
-                }
-                producer.closeAsync();
-                if (!stop.get()) {
-                    // The Producer failed due to an exception: attempt to get
-                    // another producer.
-                    producer = getNewProducer();
-                } else {
-                    // We are finished: close the consumer.
-                    consumer.closeAsync();
-                }
-            }
-        }
-    }
-
-    // picocli arguments for starting a LoadSimulationClient.
-
-    @Option(names = { "--port" }, description = "Port to listen on for 
controller", required = true)
-    public int port;
-
-    @Option(names = { "--service-url" }, description = "Pulsar Service URL", 
required = true)
-    public String serviceURL;
-
-    @Option(names = { "-ml", "--memory-limit", }, description = "Configure the 
Pulsar client memory limit "
-        + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class)
-    public long memoryLimit = 0L;
-
-
-    // Configuration class for initializing or modifying TradeUnits.
-    private static class TradeConfiguration {
-        public byte command;
-        public String topic;
-        public double rate;
-        public int size;
-        public String tenant;
-        public String group;
-
-        public TradeConfiguration() {
-            command = -1;
-            rate = 100;
-            size = 1024;
-        }
-    }
-
-    // Handle input sent from a controller.
-    private void handle(final Socket socket) throws Exception {
-        final DataInputStream inputStream = new 
DataInputStream(socket.getInputStream());
-        int command;
-        while ((command = inputStream.read()) != -1) {
-            handle((byte) command, inputStream, new 
DataOutputStream(socket.getOutputStream()));
-        }
-    }
-
-    // Decode TradeConfiguration fields common for topic creation and
-    // modification.
-    private void decodeProducerOptions(final TradeConfiguration tradeConf, 
final DataInputStream inputStream)
-            throws Exception {
-        tradeConf.topic = inputStream.readUTF();
-        tradeConf.size = inputStream.readInt();
-        tradeConf.rate = inputStream.readDouble();
-    }
-
-    // Decode TradeConfiguration fields common for group commands.
-    private void decodeGroupOptions(final TradeConfiguration tradeConf, final 
DataInputStream inputStream)
-            throws Exception {
-        tradeConf.tenant = inputStream.readUTF();
-        tradeConf.group = inputStream.readUTF();
-    }
-
-    // Handle a command sent from a controller.
-    private void handle(final byte command, final DataInputStream inputStream, 
final DataOutputStream outputStream)
-            throws Exception {
-        final TradeConfiguration tradeConf = new TradeConfiguration();
-        tradeConf.command = command;
-        switch (command) {
-        case CHANGE_COMMAND:
-            // Change the topic's settings if it exists.
-            decodeProducerOptions(tradeConf, inputStream);
-            if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
-                topicsToTradeUnits.get(tradeConf.topic).change(tradeConf);
-            }
-            break;
-        case STOP_COMMAND:
-            // Stop the topic if it exists.
-            tradeConf.topic = inputStream.readUTF();
-            if (topicsToTradeUnits.containsKey(tradeConf.topic)) {
-                topicsToTradeUnits.get(tradeConf.topic).stop.set(true);
-            }
-            break;
-        case TRADE_COMMAND:
-            // Create the topic. It is assumed that the topic does not already 
exist.
-            decodeProducerOptions(tradeConf, inputStream);
-            final TradeUnit tradeUnit = new TradeUnit(tradeConf, client, 
payloadCache);
-            topicsToTradeUnits.put(tradeConf.topic, tradeUnit);
-            executor.submit(() -> {
-                try {
-                    final String topic = tradeConf.topic;
-                    final String namespace = 
topic.substring("persistent://".length(), topic.lastIndexOf('/'));
-                    try {
-                        admin.namespaces().createNamespace(namespace);
-                    } catch (PulsarAdminException.ConflictException e) {
-                        // Ignore, already created namespace.
-                    }
-                    tradeUnit.start();
-                } catch (Exception ex) {
-                    throw new RuntimeException(ex);
-                }
-            });
-            break;
-        case CHANGE_GROUP_COMMAND:
-            // Change the settings of all topics belonging to a group.
-            decodeGroupOptions(tradeConf, inputStream);
-            tradeConf.size = inputStream.readInt();
-            tradeConf.rate = inputStream.readDouble();
-            // See if a topic belongs to this tenant and group using this 
regex.
-            final Pattern groupRegex =
-                    Pattern.compile(".*://" + tradeConf.tenant + "/.*/" + 
tradeConf.group + "-.*/.*");
-
-            for (Map.Entry<String, TradeUnit> entry : 
topicsToTradeUnits.entrySet()) {
-                final String topic = entry.getKey();
-                final TradeUnit unit = entry.getValue();
-
-                if (groupRegex.matcher(topic).matches()) {
-                    unit.change(tradeConf);
-                }
-            }
-            break;
-        case STOP_GROUP_COMMAND:
-            // Stop all topics belonging to a group.
-            decodeGroupOptions(tradeConf, inputStream);
-            // See if a topic belongs to this tenant and group using this 
regex.
-            final Pattern regex = Pattern.compile(".*://" + tradeConf.tenant + 
"/.*/" + tradeConf.group + "-.*/.*");
-            for (Map.Entry<String, TradeUnit> entry : 
topicsToTradeUnits.entrySet()) {
-                final String topic = entry.getKey();
-                final TradeUnit unit = entry.getValue();
-                if (regex.matcher(topic).matches()) {
-                    unit.stop.set(true);
-                }
-            }
-            break;
-        case FIND_COMMAND:
-            // Write a single boolean indicating if the topic was found.
-            
outputStream.writeBoolean(topicsToTradeUnits.containsKey(inputStream.readUTF()));
-            outputStream.flush();
-            break;
-        default:
-            throw new IllegalArgumentException("Unrecognized command code 
received: " + command);
-        }
-    }
-
-    // Make listener as lightweight as possible.
-    private static final MessageListener<byte[]> ackListener = 
Consumer::acknowledgeAsync;
-
-    /**
-     * Create a LoadSimulationClient with the given picocli this.
-     *
-     */
-    public LoadSimulationClient() throws PulsarClientException {
-        super("simulation-client");
-        payloadCache = new ConcurrentHashMap<>();
-        topicsToTradeUnits = new ConcurrentHashMap<>();
-    }
-
-    /**
-     * Start a client with command line this.
-     *
-     */
-    @Override
-    @SuppressWarnings("deprecation")
-    public void run() throws Exception {
-        admin = PulsarAdmin.builder()
-                .serviceHttpUrl(this.serviceURL)
-                .build();
-        client = PulsarClient.builder()
-                .memoryLimit(this.memoryLimit, SizeUnit.BYTES)
-                .serviceUrl(this.serviceURL)
-                .connectionsPerBroker(4)
-                .ioThreads(Runtime.getRuntime().availableProcessors())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .build();
-        executor = Executors.newCachedThreadPool(new 
DefaultThreadFactory("test-client"));
-        PerfClientUtils.printJVMInformation(log);
-        this.start();
-    }
-
-    /**
-     * Start listening for controller commands to create producers and 
consumers.
-     */
-    public void start() throws Exception {
-        final ServerSocket serverSocket = new ServerSocket(port);
-
-        while (!Thread.currentThread().isInterrupted()) {
-            // Technically, two controllers can be connected simultaneously, 
but
-            // non-sequential handling of commands
-            // has not been tested or considered and is not recommended.
-            log.info("Listening for controller command...");
-            final Socket socket = serverSocket.accept();
-            log.info().attr("connected", 
socket.getInetAddress().getHostName()).log("Connected to");
-            executor.submit(() -> {
-                try {
-                    handle(socket);
-                } catch (Exception ex) {
-                    throw new RuntimeException(ex);
-                }
-            });
-        }
-    }
-}
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
deleted file mode 100644
index 89ee07f46cd..00000000000
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.testclient;
-
-import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
-import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.RESOURCE_QUOTA_BASE_PATH;
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.net.Socket;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import lombok.CustomLog;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.broker.loadbalance.LoadManager;
-import org.apache.pulsar.common.policies.data.ResourceQuota;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.policies.data.loadbalancer.BundleData;
-import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
-import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import picocli.CommandLine;
-import picocli.CommandLine.Command;
-import picocli.CommandLine.Option;
-import picocli.CommandLine.Parameters;
-
-/**
- * This class provides a shell for the user to dictate how simulation clients 
should incur load.
- */
-@Command(name = "simulation-controller",
-        description = "Provides a shell for the user to dictate how simulation 
clients should "
-        + "incur load.")
-@CustomLog
-public class LoadSimulationController extends CmdBase{
-
-    // Input streams for each client to send commands through.
-    private DataInputStream[] inputStreams;
-
-    // Output streams for each client to receive information from.
-    private DataOutputStream[] outputStreams;
-
-    // client host names.
-    private String[] clients;
-
-    private Random random;
-
-    private static final ExecutorService threadPool = 
Executors.newCachedThreadPool();
-
-    // picocli arguments for starting a controller via main.
-
-    @Option(names = { "--cluster" }, description = "Cluster to test on", 
required = true)
-    String cluster;
-
-    @Option(names = { "--clients" }, description = "Comma separated list of 
client hostnames", required = true)
-    String clientHostNames;
-
-    @Option(names = { "--client-port" }, description = "Port that the clients 
are listening on", required = true)
-    int clientPort;
-
-
-    // picocli arguments for accepting user input.
-    private static class ShellArguments {
-        @Parameters(description = "Command arguments:\n" + "trade tenant 
namespace topic\n"
-                + "change tenant namespace topic\n" + "stop tenant namespace 
topic\n"
-                + "trade_group tenant group_name num_namespaces\n" + 
"change_group tenant group_name\n"
-                + "stop_group tenant group_name\n" + "script script_name\n" + 
"copy tenant_name source_zk target_zk\n"
-                + "stream source_zk\n" + "simulate zk\n", arity = "1")
-        List<String> commandArguments;
-
-        @Option(names = { "--rand-rate" }, description = "Choose message rate 
uniformly randomly from the next two "
-                + "comma separated values (overrides --rate)")
-        String rangeString = "";
-
-        @Option(names = { "--rate" }, description = "Messages per second")
-        double rate = 1;
-
-        @Option(names = { "--rate-multiplier" }, description = "Multiplier to 
use for copying or streaming rates")
-        double rateMultiplier = 1;
-
-        @Option(names = { "--separation" }, description = "Separation time in 
ms for trade_group actions "
-                + "(0 for no separation)")
-        int separation = 0;
-
-        @Option(names = { "--size" }, description = "Message size in bytes")
-        int size = 1024;
-
-        @Option(names = { "--topics-per-namespace" }, description = "Number of 
topics to create per namespace in "
-                + "trade_group (total number of topics is num_namespaces X 
num_topics)")
-        int topicsPerNamespace = 1;
-    }
-
-    // In stream mode, the BrokerWatcher watches the /loadbalance/broker zpath 
and adds LoadReportWatchers accordingly
-    // when new brokers come up.
-    private class BrokerWatcher implements Watcher {
-        private final ZooKeeper zkClient;
-
-        // Currently observed brokers.
-        private final Set<String> brokers;
-
-        // Shell arguments to configure streaming with.
-        private final ShellArguments arguments;
-
-        private BrokerWatcher(final ZooKeeper zkClient, final ShellArguments 
arguments) {
-            this.zkClient = zkClient;
-            this.arguments = arguments;
-            brokers = new HashSet<>();
-            // Observe the currently active brokers and put a watch on the 
broker root.
-            process(null);
-        }
-
-        // Add load report watchers for newly observed brokers.
-        public synchronized void process(final WatchedEvent event) {
-            try {
-                final List<String> currentBrokers = 
zkClient.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT, this);
-                for (final String broker : currentBrokers) {
-                    if (!brokers.contains(broker)) {
-                        new LoadReportWatcher(String.format("%s/%s", 
LoadManager.LOADBALANCE_BROKERS_ROOT, broker),
-                                zkClient, arguments);
-                        brokers.add(broker);
-                    }
-                }
-            } catch (Exception ex) {
-                throw new RuntimeException(ex);
-            }
-        }
-    }
-
-    // In stream mode, the LoadReportWatcher watches the /loadbalance/broker 
children and adds or modifies topics with
-    // suitable rates based on the most recent message rate and throughput 
information.
-    private class LoadReportWatcher implements Watcher {
-        private final ZooKeeper zkClient;
-        private final String path;
-        private final ShellArguments arguments;
-
-        public LoadReportWatcher(final String path, final ZooKeeper zkClient, 
final ShellArguments arguments) {
-            this.path = path;
-            this.zkClient = zkClient;
-            this.arguments = arguments;
-            // Get initial topics and set this up as a watch by calling 
process.
-            process(null);
-        }
-
-        // Update the message rate information for the bundles in a recently 
changed load report.
-        public synchronized void process(final WatchedEvent event) {
-            try {
-                // Get the load report and put this back as a watch.
-                final LoadReport loadReport = 
ObjectMapperFactory.getMapper().getObjectMapper()
-                        .readValue(zkClient.getData(path, this, null), 
LoadReport.class);
-                for (final Map.Entry<String, NamespaceBundleStats> entry : 
loadReport.getBundleStats().entrySet()) {
-                    final String bundle = entry.getKey();
-                    final String namespace = bundle.substring(0, 
bundle.lastIndexOf('/'));
-                    final String topic = String.format("%s/%s", namespace, 
"t");
-                    final NamespaceBundleStats stats = entry.getValue();
-
-                    // Approximate total message rate via average between 
in/out.
-                    final double messageRate = arguments.rateMultiplier * 
(stats.msgRateIn + stats.msgRateOut) / 2;
-
-                    // size = throughput / rate.
-                    final int messageSize = (int) 
Math.ceil(arguments.rateMultiplier
-                            * (stats.msgThroughputIn + stats.msgThroughputOut) 
/ (2 * messageRate));
-
-                    arguments.rate = messageRate;
-                    arguments.size = messageSize;
-                    // Try to modify the topic if it already exists. 
Otherwise, create it.
-                    changeOrCreate(arguments, topic);
-                }
-            } catch (Exception ex) {
-                throw new RuntimeException(ex);
-            }
-        }
-    }
-
-    /**
-     * Create a LoadSimulationController with the given picocli arguments.
-     *
-     */
-    public LoadSimulationController() throws Exception {
-        super("simulation-controller");
-    }
-
-    // Check that the expected number of application arguments matches the
-    // actual number of application arguments.
-    private boolean checkAppArgs(final int numAppArgs, final int numRequired) {
-        if (numAppArgs != numRequired) {
-            log.info()
-                    .attr("found", numAppArgs)
-                    .attr("required", numRequired)
-                    .log("ERROR: Wrong number of application arguments (found 
, required");
-            return false;
-        }
-        return true;
-    }
-
-    // Recursively acquire all resource quotas by getting the ZK children of 
the given path and calling this function
-    // on the children if there are any, or getting the data from this ZNode 
otherwise.
-    private void getResourceQuotas(final String path, final ZooKeeper zkClient,
-            final Map<String, ResourceQuota>[] threadLocalMaps) throws 
Exception {
-        final List<String> children = zkClient.getChildren(path, false);
-        if (children.isEmpty()) {
-            threadLocalMaps[random.nextInt(clients.length)].put(path, 
ObjectMapperFactory.getMapper().getObjectMapper()
-                    .readValue(zkClient.getData(path, false, null), 
ResourceQuota.class));
-        } else {
-            for (final String child : children) {
-                getResourceQuotas(String.format("%s/%s", path, child), 
zkClient, threadLocalMaps);
-            }
-        }
-    }
-
-    // Initialize a BundleData from a resource quota and configurations and 
modify the quota accordingly.
-    private BundleData initializeBundleData(final ResourceQuota quota, final 
ShellArguments arguments) {
-        final double messageRate = (quota.getMsgRateIn() + 
quota.getMsgRateOut()) / 2;
-        final int messageSize = (int) Math.ceil((quota.getBandwidthIn() + 
quota.getBandwidthOut()) / (2 * messageRate));
-        arguments.rate = messageRate * arguments.rateMultiplier;
-        arguments.size = messageSize;
-        final NamespaceBundleStats startingStats = new NamespaceBundleStats();
-
-        // Modify the original quota so that new rates are set.
-        final double modifiedRate = messageRate * arguments.rateMultiplier;
-        final double modifiedBandwidth = (quota.getBandwidthIn() + 
quota.getBandwidthOut()) * arguments.rateMultiplier
-                / 2;
-        quota.setMsgRateIn(modifiedRate);
-        quota.setMsgRateOut(modifiedRate);
-        quota.setBandwidthIn(modifiedBandwidth);
-        quota.setBandwidthOut(modifiedBandwidth);
-
-        // Assume modified memory usage is comparable to the rate multiplier 
times the original usage.
-        quota.setMemory(quota.getMemory() * arguments.rateMultiplier);
-        startingStats.msgRateIn = quota.getMsgRateIn();
-        startingStats.msgRateOut = quota.getMsgRateOut();
-        startingStats.msgThroughputIn = quota.getBandwidthIn();
-        startingStats.msgThroughputOut = quota.getBandwidthOut();
-        final BundleData bundleData = new BundleData(10, 1000, startingStats);
-        // Assume there is ample history for the bundle.
-        bundleData.getLongTermData().setNumSamples(1000);
-        bundleData.getShortTermData().setNumSamples(10);
-        return bundleData;
-    }
-
-    // Makes a topic string from a tenant name, namespace name, and topic
-    // name.
-    private String makeTopic(final String tenant, final String namespace, 
final String topic) {
-        return String.format("persistent://%s/%s/%s/%s", tenant, cluster, 
namespace, topic);
-    }
-
-    // Write options that are common to modifying and creating topics.
-    private void writeProducerOptions(final DataOutputStream outputStream, 
final ShellArguments arguments,
-            final String topic) throws Exception {
-        if (!arguments.rangeString.isEmpty()) {
-            // If --rand-rate was specified, extract the bounds by splitting on
-            // the comma and parsing the resulting
-            // doubles.
-            final String[] splits = arguments.rangeString.split(",");
-            if (splits.length != 2) {
-                log.error("Argument to --rand-rate should be two 
comma-separated values");
-                return;
-            }
-            final double first = Double.parseDouble(splits[0]);
-            final double second = Double.parseDouble(splits[1]);
-            final double min = Math.min(first, second);
-            final double max = Math.max(first, second);
-            arguments.rate = random.nextDouble() * (max - min) + min;
-        }
-        outputStream.writeUTF(topic);
-        outputStream.writeInt(arguments.size);
-        outputStream.writeDouble(arguments.rate);
-    }
-
-    // Change producer settings for a given topic and picocli arguments.
-    private void change(final ShellArguments arguments, final String topic, 
final int client) throws Exception {
-        outputStreams[client].write(LoadSimulationClient.CHANGE_COMMAND);
-        writeProducerOptions(outputStreams[client], arguments, topic);
-        outputStreams[client].flush();
-    }
-
-    // Change an existing topic, or create it if it does not exist.
-    private int changeOrCreate(final ShellArguments arguments, final String 
topic) throws Exception {
-        final int client = find(topic);
-        if (client == -1) {
-            trade(arguments, topic, random.nextInt(clients.length));
-        } else {
-            change(arguments, topic, client);
-        }
-        return client;
-    }
-
-    // Find a topic and change it if it exists.
-    private int changeIfExists(final ShellArguments arguments, final String 
topic) throws Exception {
-        final int client = find(topic);
-        if (client != -1) {
-            change(arguments, topic, client);
-        }
-        return client;
-    }
-
-    // Attempt to find a topic on the clients.
-    private int find(final String topic) throws Exception {
-        int clientWithTopic = -1;
-        for (int i = 0; i < clients.length; ++i) {
-            outputStreams[i].write(LoadSimulationClient.FIND_COMMAND);
-            outputStreams[i].writeUTF(topic);
-        }
-        for (int i = 0; i < clients.length; ++i) {
-            if (inputStreams[i].readBoolean()) {
-                clientWithTopic = i;
-            }
-        }
-        return clientWithTopic;
-    }
-
-    // Trade using the arguments parsed via picocli and the topic name.
-    private synchronized void trade(final ShellArguments arguments, final 
String topic, final int client)
-            throws Exception {
-        // Decide which client to send to randomly to preserve statelessness of
-        // the controller.
-        outputStreams[client].write(LoadSimulationClient.TRADE_COMMAND);
-        writeProducerOptions(outputStreams[client], arguments, topic);
-        outputStreams[client].flush();
-    }
-
-    // Handle the command line arguments associated with the change command.
-    private void handleChange(final ShellArguments arguments) throws Exception 
{
-        final List<String> commandArguments = arguments.commandArguments;
-        // Change expects three application arguments: tenant name, namespace 
name, and topic name.
-        if (checkAppArgs(commandArguments.size() - 1, 3)) {
-            final String topic = makeTopic(commandArguments.get(1), 
commandArguments.get(2),
-                    commandArguments.get(3));
-            if (changeIfExists(arguments, topic) == -1) {
-                log.info().attr("topic", topic).log("Topic not found");
-            }
-        }
-    }
-
-    // Handle the command line arguments associated with the copy command.
-    @SuppressWarnings("unchecked")
-    private void handleCopy(final ShellArguments arguments) throws Exception {
-        final List<String> commandArguments = arguments.commandArguments;
-        // Copy accepts 3 application arguments: Tenant name, source ZooKeeper 
and target ZooKeeper connect strings.
-        if (checkAppArgs(commandArguments.size() - 1, 3)) {
-            final String tenantName = commandArguments.get(1);
-            final String sourceZKConnectString = commandArguments.get(2);
-            final String targetZKConnectString = commandArguments.get(3);
-            final ZooKeeper sourceZKClient = new 
ZooKeeper(sourceZKConnectString, 5000, null);
-            final ZooKeeper targetZKClient = new 
ZooKeeper(targetZKConnectString, 5000, null);
-            // Make a map for each thread to speed up the ZooKeeper writing 
process.
-            final Map<String, ResourceQuota>[] threadLocalMaps = new 
Map[clients.length];
-            for (int i = 0; i < clients.length; ++i) {
-                threadLocalMaps[i] = new HashMap<>();
-            }
-            getResourceQuotas(RESOURCE_QUOTA_BASE_PATH, sourceZKClient, 
threadLocalMaps);
-            final List<Future> futures = new ArrayList<>(clients.length);
-            int i = 0;
-            log.info("Copying...");
-            for (final Map<String, ResourceQuota> bundleToQuota : 
threadLocalMaps) {
-                final int j = i;
-                futures.add(threadPool.submit(() -> {
-                    for (final Map.Entry<String, ResourceQuota> entry : 
bundleToQuota.entrySet()) {
-                        final String bundle = entry.getKey();
-                        final ResourceQuota quota = entry.getValue();
-                        // Simulation will send messages in and out at about 
the same rate, so just make the rate the
-                        // average of in and out.
-
-                        final int tenantStart = 
RESOURCE_QUOTA_BASE_PATH.length() + 1;
-                        final int clusterStart = bundle.indexOf('/', 
tenantStart) + 1;
-                        final String sourceTenant = 
bundle.substring(tenantStart, clusterStart - 1);
-                        final int namespaceStart = bundle.indexOf('/', 
clusterStart) + 1;
-                        final String sourceCluster = 
bundle.substring(clusterStart, namespaceStart - 1);
-                        final String namespace = 
bundle.substring(namespaceStart, bundle.lastIndexOf('/'));
-                        final String keyRangeString = 
bundle.substring(bundle.lastIndexOf('/') + 1);
-                        // To prevent duplicate node issues for same namespace 
names in different clusters/tenants.
-                        final String manglePrefix = String.format("%s-%s-%s", 
sourceCluster, sourceTenant,
-                                keyRangeString);
-                        final String mangledNamespace = String.format("%s-%s", 
manglePrefix, namespace);
-                        final BundleData bundleData = 
initializeBundleData(quota, arguments);
-                        final String oldAPITargetPath = String.format(
-                                "%s/namespace/%s/%s/%s/0x00000000_0xffffffff", 
BUNDLE_DATA_BASE_PATH, tenantName,
-                                cluster, mangledNamespace);
-                        final String newAPITargetPath = String.format(
-                                "%s/%s/%s/%s/0x00000000_0xffffffff", 
BUNDLE_DATA_BASE_PATH, tenantName, cluster,
-                                mangledNamespace);
-                        try {
-                            ZkUtils.createFullPathOptimistic(targetZKClient, 
oldAPITargetPath,
-                                    
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(quota),
-                                    ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-                        } catch (KeeperException.NodeExistsException e) {
-                            // Ignore already created nodes.
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                        // Put the bundle data in the new ZooKeeper.
-                        try {
-                            ZkUtils.createFullPathOptimistic(targetZKClient, 
newAPITargetPath,
-                                    
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(bundleData),
-                                    ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-                        } catch (KeeperException.NodeExistsException e) {
-                            // Ignore already created nodes.
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                        try {
-                            trade(arguments, makeTopic(tenantName, 
mangledNamespace, "t"), j);
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }));
-                ++i;
-            }
-            for (final Future future : futures) {
-                future.get();
-            }
-            sourceZKClient.close();
-            targetZKClient.close();
-        }
-    }
-
-    // Handle the command line arguments associated with the simulate command.
-    @SuppressWarnings("unchecked")
-    private void handleSimulate(final ShellArguments arguments) throws 
Exception {
-        final List<String> commandArguments = arguments.commandArguments;
-        checkAppArgs(commandArguments.size() - 1, 1);
-        final ZooKeeper zkClient = new ZooKeeper(commandArguments.get(1), 
5000, null);
-        // Make a map for each thread to speed up the ZooKeeper writing 
process.
-        final Map<String, ResourceQuota>[] threadLocalMaps = new 
Map[clients.length];
-        for (int i = 0; i < clients.length; ++i) {
-            threadLocalMaps[i] = new HashMap<>();
-        }
-        getResourceQuotas(RESOURCE_QUOTA_BASE_PATH, zkClient, threadLocalMaps);
-        final List<Future> futures = new ArrayList<>(clients.length);
-        int i = 0;
-        log.info("Simulating...");
-        for (final Map<String, ResourceQuota> bundleToQuota : threadLocalMaps) 
{
-            final int j = i;
-            futures.add(threadPool.submit(() -> {
-                for (final Map.Entry<String, ResourceQuota> entry : 
bundleToQuota.entrySet()) {
-                    final String bundle = entry.getKey();
-                    final String newAPIPath = 
bundle.replace(RESOURCE_QUOTA_BASE_PATH, BUNDLE_DATA_BASE_PATH);
-                    final ResourceQuota quota = entry.getValue();
-                    final int tenantStart = RESOURCE_QUOTA_BASE_PATH.length() 
+ 1;
-                    final String topic = String.format("persistent://%s/t", 
bundle.substring(tenantStart));
-                    final BundleData bundleData = initializeBundleData(quota, 
arguments);
-                    // Put the bundle data in the new ZooKeeper.
-                    try {
-                        ZkUtils.createFullPathOptimistic(zkClient, newAPIPath,
-                                
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(bundleData),
-                                ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-                    } catch (KeeperException.NodeExistsException e) {
-                        try {
-                            zkClient.setData(newAPIPath,
-                                    
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(bundleData), -1);
-                        } catch (Exception ex) {
-                            throw new RuntimeException(ex);
-                        }
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                    try {
-                        trade(arguments, topic, j);
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-            }));
-            ++i;
-        }
-        for (final Future future : futures) {
-            future.get();
-        }
-        zkClient.close();
-    }
-
-    // Handle the command line arguments associated with the stop command.
-    private void handleStop(final ShellArguments arguments) throws Exception {
-        final List<String> commandArguments = arguments.commandArguments;
-        // Stop expects three application arguments: tenant name, namespace
-        // name, and topic name.
-        if (checkAppArgs(commandArguments.size() - 1, 3)) {
-            final String topic = makeTopic(commandArguments.get(1), 
commandArguments.get(2),
-                    commandArguments.get(3));
-            for (DataOutputStream outputStream : outputStreams) {
-                outputStream.write(LoadSimulationClient.STOP_COMMAND);
-                outputStream.writeUTF(topic);
-                outputStream.flush();
-            }
-        }
-    }
-
-    // Handle the command line arguments associated with the stream command.
-    private void handleStream(final ShellArguments arguments) throws Exception 
{
-        final List<String> commandArguments = arguments.commandArguments;
-        // Stream accepts 1 application argument: ZooKeeper connect string.
-        if (checkAppArgs(commandArguments.size() - 1, 1)) {
-            final String zkConnectString = commandArguments.get(1);
-            final ZooKeeper zkClient = new ZooKeeper(zkConnectString, 5000, 
null);
-            new BrokerWatcher(zkClient, arguments);
-            // This controller will now stream rate changes from the given ZK.
-            // Users wishing to stop this should Ctrl + C and use another
-            // Controller to send new commands.
-            Thread.currentThread().join();
-        }
-    }
-
-    // Handle the command line arguments associated with the trade command.
-    private void handleTrade(final ShellArguments arguments) throws Exception {
-        final List<String> commandArguments = arguments.commandArguments;
-        // Trade expects three application arguments: tenant, namespace, and
-        // topic.
-        if (checkAppArgs(commandArguments.size() - 1, 3)) {
-            final String topic = makeTopic(commandArguments.get(1), 
commandArguments.get(2),
-                    commandArguments.get(3));
-            trade(arguments, topic, random.nextInt(clients.length));
-        }
-    }
-
-    // Handle the command line arguments associated with the group change 
command.
-    private void handleGroupChange(final ShellArguments arguments) throws 
Exception {
-        final List<String> commandArguments = arguments.commandArguments;
-        // Group change expects two application arguments: tenant name and 
group
-        // name.
-        if (checkAppArgs(commandArguments.size() - 1, 2)) {
-            final String tenant = commandArguments.get(1);
-            final String group = commandArguments.get(2);
-            for (DataOutputStream outputStream : outputStreams) {
-                outputStream.write(LoadSimulationClient.CHANGE_GROUP_COMMAND);
-                outputStream.writeUTF(tenant);
-                outputStream.writeUTF(group);
-                outputStream.writeInt(arguments.size);
-                outputStream.writeDouble(arguments.rate);
-                outputStream.flush();
-            }
-        }
-    }
-
-    // Handle the command line arguments associated with the group stop 
command.
-    private void handleGroupStop(final ShellArguments arguments) throws 
Exception {
-        final List<String> commandArguments = arguments.commandArguments;
-        // Group stop requires two application arguments: tenant name and group
-        // name.
-        if (checkAppArgs(commandArguments.size() - 1, 2)) {
-            final String tenant = commandArguments.get(1);
-            final String group = commandArguments.get(2);
-            for (DataOutputStream outputStream : outputStreams) {
-                outputStream.write(LoadSimulationClient.STOP_GROUP_COMMAND);
-                outputStream.writeUTF(tenant);
-                outputStream.writeUTF(group);
-                outputStream.flush();
-            }
-        }
-    }
-
-    // Handle the command line arguments associated with the group trade 
command.
-    private void handleGroupTrade(final ShellArguments arguments) throws 
Exception {
-        final List<String> commandArguments = arguments.commandArguments;
-        // Group trade expects 3 application arguments: tenant name, group 
name,
-        // and number of namespaces.
-        if (checkAppArgs(commandArguments.size() - 1, 3)) {
-            final String tenant = commandArguments.get(1);
-            final String group = commandArguments.get(2);
-            final int numNamespaces = 
Integer.parseInt(commandArguments.get(3));
-            for (int i = 0; i < numNamespaces; ++i) {
-                for (int j = 0; j < arguments.topicsPerNamespace; ++j) {
-                    // For each namespace and topic pair, create the namespace
-                    // by using the group name and the
-                    // namespace index, and then create the topic by using the
-                    // topic index. Then just call trade.
-                    final String topic = makeTopic(tenant, 
String.format("%s-%d", group, i),
-                            Integer.toString(j));
-                    trade(arguments, topic, random.nextInt(clients.length));
-                    Thread.sleep(arguments.separation);
-                }
-            }
-        }
-    }
-
-    /**
-     * Read the user-submitted arguments as commands to send to clients.
-     *
-     * @param args
-     *            Arguments split on whitespace from user input.
-     */
-    private void read(final String[] args) {
-        // Don't attempt to process blank input.
-        if (args.length > 0 && !(args.length == 1 && args[0].isEmpty())) {
-            final ShellArguments arguments = new ShellArguments();
-            final CommandLine commander = new CommandLine(arguments);
-            try {
-                commander.parseArgs(args);
-                final String command = arguments.commandArguments.get(0);
-                switch (command) {
-                case "trade":
-                    handleTrade(arguments);
-                    break;
-                case "change":
-                    handleChange(arguments);
-                    break;
-                case "stop":
-                    handleStop(arguments);
-                    break;
-                case "trade_group":
-                    handleGroupTrade(arguments);
-                    break;
-                case "change_group":
-                    handleGroupChange(arguments);
-                    break;
-                case "stop_group":
-                    handleGroupStop(arguments);
-                    break;
-                case "script":
-                    // Read input from the given script instead of stdin until
-                    // the script has executed completely.
-                    final List<String> commandArguments = 
arguments.commandArguments;
-                    checkAppArgs(commandArguments.size() - 1, 1);
-                    final String scriptName = commandArguments.get(1);
-                    try (BufferedReader scriptReader = new BufferedReader(
-                            new InputStreamReader(new 
FileInputStream(Paths.get(scriptName).toFile())))) {
-                        String line;
-                        while ((line = scriptReader.readLine()) != null) {
-                            read(line.split("\\s+"));
-                        }
-                    }
-                    break;
-                case "copy":
-                    handleCopy(arguments);
-                    break;
-                case "stream":
-                    handleStream(arguments);
-                    break;
-                case "simulate":
-                    handleSimulate(arguments);
-                    break;
-                case "quit":
-                case "exit":
-                    PerfClientUtils.exit(0);
-                    break;
-                default:
-                    log.info().attr("command", command).log("ERROR: Unknown 
command \" \"");
-                }
-            } catch (ParameterException ex) {
-                System.out.println(ex.getMessage());
-                commander.usage(commander.getOut());
-            } catch (Exception ex) {
-                ex.printStackTrace();
-            }
-        }
-    }
-
-    /**
-     * Create a shell for the user to send commands to clients.
-     */
-    public void start() throws Exception {
-        BufferedReader inReader = new BufferedReader(new 
InputStreamReader(System.in));
-        while (!Thread.currentThread().isInterrupted()) {
-            // Print the very simple prompt.
-            System.out.println();
-            System.out.print("> ");
-            read(inReader.readLine().split("\\s+"));
-        }
-    }
-
-    /**
-     * Start a controller with command line arguments.
-     *
-     */
-    @Override
-    public void run() throws Exception {
-        random = new Random();
-        clients = this.clientHostNames.split(",");
-        final Socket[] sockets = new Socket[clients.length];
-        inputStreams = new DataInputStream[clients.length];
-        outputStreams = new DataOutputStream[clients.length];
-        log.info().attr("found", clients.length).log("Found clients");
-        for (int i = 0; i < clients.length; ++i) {
-            sockets[i] = new Socket(clients[i], clientPort);
-            inputStreams[i] = new DataInputStream(sockets[i].getInputStream());
-            outputStreams[i] = new 
DataOutputStream(sockets[i].getOutputStream());
-            log.info().attr("connected", clients[i]).log("Connected to");
-        }
-        start();
-    }
-}
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PulsarPerfTestTool.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PulsarPerfTestTool.java
index 826060dc6b7..d187ba97f37 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PulsarPerfTestTool.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PulsarPerfTestTool.java
@@ -50,8 +50,6 @@ public class PulsarPerfTestTool {
         commandMap.put("transaction", PerformanceTransaction.class);
         commandMap.put("read", PerformanceReader.class);
         commandMap.put("monitor-brokers", BrokerMonitor.class);
-        commandMap.put("simulation-client", LoadSimulationClient.class);
-        commandMap.put("simulation-controller", 
LoadSimulationController.class);
         commandMap.put("websocket-producer", PerformanceClient.class);
         commandMap.put("managed-ledger", ManagedLedgerWriter.class);
         commandMap.put("gen-doc", CmdGenerateDocumentation.class);

Reply via email to