This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new caa493ba583 [improve][test] Migrate pulsar-perf to the V5 client API
(#25887)
caa493ba583 is described below
commit caa493ba58389680a86579fe78274976768ceaa9
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 1 18:34:03 2026 -0700
[improve][test] Migrate pulsar-perf to the V5 client API (#25887)
---
.../pulsar/client/api/v5/QueueConsumerBuilder.java | 11 +
.../client/impl/v5/PulsarClientBuilderV5.java | 29 +-
.../client/impl/v5/QueueConsumerBuilderV5.java | 6 +
.../client/impl/v5/PulsarClientBuilderV5Test.java | 70 +++
pulsar-testclient/build.gradle.kts | 2 +
.../java/org/apache/pulsar/testclient/CmdBase.java | 5 +
.../apache/pulsar/testclient/PerfClientUtils.java | 95 ++++
.../pulsar/testclient/PerformanceConsumer.java | 501 ++++++++++++---------
.../pulsar/testclient/PerformanceProducer.java | 181 ++++----
.../pulsar/testclient/PerformanceReader.java | 156 ++++---
.../pulsar/testclient/PerformanceTransaction.java | 283 ++++++------
.../pulsar/testclient/PerformanceProducerTest.java | 32 +-
12 files changed, 846 insertions(+), 525 deletions(-)
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
index f616f79beee..f04c41804ff 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
@@ -177,6 +177,17 @@ public interface QueueConsumerBuilder<T> {
*/
QueueConsumerBuilder<T> negativeAckRedeliveryBackoff(BackoffPolicy
backoff);
+ /**
+ * Whether the subscription cursor should be replicated to other clusters
in a geo-replication
+ * setup. When {@code true}, the subscription state (acknowledgments) is
replicated alongside
+ * the topic messages, so a consumer on a different cluster can resume
from where this one
+ * left off after a failover. Defaults to {@code false}.
+ *
+ * @param replicate whether subscription state should be geo-replicated
+ * @return this builder instance for chaining
+ */
+ QueueConsumerBuilder<T> replicateSubscriptionState(boolean replicate);
+
// --- Dead letter queue ---
/**
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
index 07894ab43aa..8104c4bbc36 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
@@ -73,6 +73,16 @@ final class PulsarClientBuilderV5 implements
PulsarClientBuilder {
throws PulsarClientException {
conf.setAuthPluginClassName(authPluginClassName);
conf.setAuthParams(authParamsString);
+ // Instantiate the Authentication and attach it to the conf — the v4
PulsarClientImpl
+ // honors the plugin class name + params strings only as metadata, it
reads the actual
+ // Authentication instance via conf.getAuthentication() at connect
time.
+ try {
+ conf.setAuthentication(
+ org.apache.pulsar.client.api.AuthenticationFactory.create(
+ authPluginClassName, authParamsString));
+ } catch
(org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException
e) {
+ throw new PulsarClientException(e.getMessage(), e);
+ }
return this;
}
@@ -113,10 +123,18 @@ final class PulsarClientBuilderV5 implements
PulsarClientBuilder {
@Override
public PulsarClientBuilder tlsPolicy(TlsPolicy policy) {
- // TlsPolicy configures TLS settings
- // For now, just enable TLS — full TLS config adaptation will be
- // implemented when TlsPolicy internals are defined
conf.setUseTls(true);
+ if (policy.trustCertsFilePath() != null) {
+ conf.setTlsTrustCertsFilePath(policy.trustCertsFilePath());
+ }
+ if (policy.keyFilePath() != null) {
+ conf.setTlsKeyFilePath(policy.keyFilePath());
+ }
+ if (policy.certificateFilePath() != null) {
+ conf.setTlsCertificateFilePath(policy.certificateFilePath());
+ }
+ conf.setTlsAllowInsecureConnection(policy.allowInsecureConnection());
+
conf.setTlsHostnameVerificationEnable(policy.enableHostnameVerification());
return this;
}
@@ -145,6 +163,11 @@ final class PulsarClientBuilderV5 implements
PulsarClientBuilder {
return this;
}
+ /** @return the underlying v4 configuration data; for tests in this
package only. */
+ ClientConfigurationData getConfForTesting() {
+ return conf;
+ }
+
/**
* Reject anything that isn't the broker binary protocol. The most common
* mistake is passing the admin/web service URL ({@code http://...}) where
a
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
index c1d8bdf261f..14939e9d75f 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
@@ -184,6 +184,12 @@ final class QueueConsumerBuilderV5<T> implements
QueueConsumerBuilder<T> {
return this;
}
+ @Override
+ public QueueConsumerBuilderV5<T> replicateSubscriptionState(boolean
replicate) {
+ conf.setReplicateSubscriptionState(replicate);
+ return this;
+ }
+
@Override
public QueueConsumerBuilderV5<T>
negativeAckRedeliveryBackoff(BackoffPolicy backoff) {
conf.setNegativeAckRedeliveryBackoff(
diff --git
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5Test.java
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5Test.java
index b8dc5202540..c081f6df6a0 100644
---
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5Test.java
+++
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5Test.java
@@ -18,12 +18,18 @@
*/
package org.apache.pulsar.client.impl.v5;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.pulsar.client.api.v5.PulsarClient;
import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
import org.apache.pulsar.client.api.v5.config.ConnectionPolicy;
+import org.apache.pulsar.client.api.v5.config.TlsPolicy;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.testng.annotations.Test;
/**
@@ -95,6 +101,70 @@ public class PulsarClientBuilderV5Test {
"error must name the offending field: " + e.getMessage());
}
+ @Test
+ public void testTlsPolicyFieldsPropagate() {
+ // Build a fully-populated TlsPolicy and confirm every field lands on
the underlying
+ // v4 ClientConfigurationData. Used to be a stub that only set
useTls=true.
+ PulsarClientBuilderV5 builder = new PulsarClientBuilderV5();
+ TlsPolicy policy = TlsPolicy.builder()
+ .trustCertsFilePath("/path/to/ca.pem")
+ .keyFilePath("/path/to/client.key")
+ .certificateFilePath("/path/to/client.cert")
+ .allowInsecureConnection(true)
+ .enableHostnameVerification(false)
+ .build();
+
+ builder.tlsPolicy(policy);
+
+ ClientConfigurationData conf = builder.getConfForTesting();
+ assertTrue(conf.isUseTls());
+ assertEquals(conf.getTlsTrustCertsFilePath(), "/path/to/ca.pem");
+ assertEquals(conf.getTlsKeyFilePath(), "/path/to/client.key");
+ assertEquals(conf.getTlsCertificateFilePath(), "/path/to/client.cert");
+ assertTrue(conf.isTlsAllowInsecureConnection());
+ assertFalse(conf.isTlsHostnameVerificationEnable());
+ }
+
+ @Test
+ public void testTlsPolicyInsecureShortcut() {
+ // TlsPolicy.ofInsecure() is the dev convenience that disables
verification.
+ PulsarClientBuilderV5 builder = new PulsarClientBuilderV5();
+ builder.tlsPolicy(TlsPolicy.ofInsecure());
+
+ ClientConfigurationData conf = builder.getConfForTesting();
+ assertTrue(conf.isUseTls());
+ assertTrue(conf.isTlsAllowInsecureConnection());
+ assertFalse(conf.isTlsHostnameVerificationEnable());
+ }
+
+ @Test
+ public void testAuthenticationPluginAndParamsInstantiatesAuthentication()
throws Exception {
+ // Regression for a bug where authentication(plugin, params) only set
the strings and
+ // never called conf.setAuthentication(...) with the instantiated
Authentication object.
+ // PulsarClientImpl reads the Authentication instance via
conf.getAuthentication() at
+ // connect time — without it, the client connects with no credentials
and the broker
+ // rejects the handshake. Use the v4 AuthenticationDisabled stub which
always exists on
+ // the classpath; we only care that *some* instance lands on the conf.
+ PulsarClientBuilderV5 builder = new PulsarClientBuilderV5();
+ builder.authentication(
+ "org.apache.pulsar.client.impl.auth.AuthenticationDisabled",
"");
+
+ ClientConfigurationData conf = builder.getConfForTesting();
+ assertEquals(conf.getAuthPluginClassName(),
+ "org.apache.pulsar.client.impl.auth.AuthenticationDisabled");
+ assertNotNull(conf.getAuthentication(),
+ "Authentication instance must be created and attached to the
conf");
+ }
+
+ @Test
+ public void testAuthenticationPluginNotFoundIsWrapped() {
+ // A bad plugin class name should surface as V5 PulsarClientException
(not a v4 exception
+ // type leaking through the surface).
+ PulsarClientBuilderV5 builder = new PulsarClientBuilderV5();
+ assertThrows(PulsarClientException.class, () ->
+ builder.authentication("com.example.NoSuchAuth", ""));
+ }
+
private static IllegalArgumentException assertThrowsIAE(Runnable r) {
try {
r.run();
diff --git a/pulsar-testclient/build.gradle.kts
b/pulsar-testclient/build.gradle.kts
index e2e24c96b39..54637f9c0e7 100644
--- a/pulsar-testclient/build.gradle.kts
+++ b/pulsar-testclient/build.gradle.kts
@@ -26,6 +26,8 @@ dependencies {
implementation(libs.slog)
implementation(project(":pulsar-client-original"))
implementation(project(":pulsar-client-admin-original"))
+ implementation(project(":pulsar-client-api-v5"))
+ implementation(project(":pulsar-client-v5"))
implementation(project(":pulsar-client-messagecrypto-bc"))
implementation(project(":pulsar-broker"))
implementation(project(":pulsar-cli-utils"))
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdBase.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdBase.java
index 6d5796ad5dd..b1d1edd4421 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdBase.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/CmdBase.java
@@ -27,6 +27,11 @@ public abstract class CmdBase implements Callable<Integer> {
public CmdBase(String cmdName) {
commander = new CommandLine(this);
commander.setCommandName(cmdName);
+ // V5 enums (SubscriptionInitialPosition.EARLIEST,
ProducerAccessMode.SHARED, ...) are
+ // uppercase, while the v4 flags users have been passing for years use
mixed case
+ // ("Earliest", "Shared"). Picocli's default enum parsing is
case-sensitive, so we'd
+ // break flag UX without this. Match v4 + V5 spellings interchangeably.
+ commander.setCaseInsensitiveEnumValuesAllowed(true);
}
public boolean run(String[] args) {
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index 1e7e40f374e..d8528c3e593 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -35,6 +35,11 @@ import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.config.ConnectionPolicy;
+import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.api.v5.config.ProxyProtocol;
+import org.apache.pulsar.client.api.v5.config.TlsPolicy;
import org.apache.pulsar.common.util.DirectMemoryUtils;
/**
@@ -113,6 +118,79 @@ public class PerfClientUtils {
return clientBuilder;
}
+ /**
+ * Build a V5 {@link PulsarClientBuilder} from the perf CLI arguments.
+ *
+ * <p>The V5 client is used by the perf commands so they work
transparently against both
+ * regular and scalable topics — the V5 SDK detects the topic kind via
{@code topic://} vs
+ * {@code persistent://} lookup and routes accordingly.
+ *
+ * <p>A few v4 settings have no direct V5 equivalent and are dropped here:
{@code --stats-
+ * interval-seconds} (V5 stats are OpenTelemetry-driven), {@code
--max-lookup-request} (V5
+ * does not expose a public knob), {@code --ssl-factory-plugin*} (V5 does
not have a pluggable
+ * SSL factory yet), and {@code --busy-wait} (no V5 equivalent). All other
relevant flags map
+ * 1:1.
+ */
+ public static PulsarClientBuilder
createV5ClientBuilderFromArguments(PerformanceBaseArguments arguments)
+ throws org.apache.pulsar.client.api.v5.PulsarClientException {
+
+ ConnectionPolicy.Builder connectionPolicy = ConnectionPolicy.builder()
+ .connectionsPerBroker(arguments.maxConnections)
+ .ioThreads(arguments.ioThreads)
+ .callbackThreads(arguments.listenerThreads);
+ if (isNotBlank(arguments.proxyServiceURL)) {
+ ProxyProtocol v5Proto = arguments.proxyProtocol != null
+ ? ProxyProtocol.valueOf(arguments.proxyProtocol.name())
+ : null;
+ connectionPolicy.proxy(arguments.proxyServiceURL, v5Proto);
+ }
+
+ PulsarClientBuilder builder =
org.apache.pulsar.client.api.v5.PulsarClient.builder()
+ .serviceUrl(arguments.serviceURL)
+ .memoryLimit(MemorySize.ofBytes(arguments.memoryLimit))
+ .connectionPolicy(connectionPolicy.build())
+ .openTelemetry(AutoConfiguredOpenTelemetrySdk.builder()
+ .addPropertiesSupplier(() ->
Map.of("otel.sdk.disabled", "true"))
+ .build().getOpenTelemetrySdk());
+
+ if (isNotBlank(arguments.authPluginClassName)) {
+ builder.authentication(arguments.authPluginClassName,
arguments.authParams);
+ }
+
+ // TLS: only wire a TlsPolicy if the user genuinely wants TLS. The
Boolean flags can come
+ // through as Boolean.FALSE (not null) when picocli's default-value
resolution fires even
+ // without the flag being passed, so we cannot treat "non-null" as
"user wanted TLS" —
+ // that would incorrectly enable TLS against a plaintext broker. The
rule:
+ // - TLS is "on" if the URL is pulsar+ssl://, or
+ // - a trust cert path was explicitly supplied, or
+ // - either boolean was explicitly set to TRUE.
+ // PulsarClientBuilderV5#tlsPolicy unconditionally flips useTls=true.
+ boolean tlsByUrl = arguments.serviceURL != null
+ && arguments.serviceURL.startsWith("pulsar+ssl://");
+ boolean tlsByTrustPath = isNotBlank(arguments.tlsTrustCertsFilePath);
+ boolean tlsByBoolean =
Boolean.TRUE.equals(arguments.tlsAllowInsecureConnection)
+ ||
Boolean.TRUE.equals(arguments.tlsHostnameVerificationEnable);
+ if (tlsByUrl || tlsByTrustPath || tlsByBoolean) {
+ TlsPolicy.Builder tls = TlsPolicy.builder();
+ if (isNotBlank(arguments.tlsTrustCertsFilePath)) {
+ tls.trustCertsFilePath(arguments.tlsTrustCertsFilePath);
+ }
+ if (arguments.tlsAllowInsecureConnection != null) {
+
tls.allowInsecureConnection(arguments.tlsAllowInsecureConnection);
+ }
+ if (arguments.tlsHostnameVerificationEnable != null) {
+
tls.enableHostnameVerification(arguments.tlsHostnameVerificationEnable);
+ }
+ builder.tlsPolicy(tls.build());
+ }
+
+ if (isNotBlank(arguments.listenerName)) {
+ builder.listenerName(arguments.listenerName);
+ }
+
+ return builder;
+ }
+
public static PulsarAdminBuilder
createAdminBuilderFromArguments(PerformanceBaseArguments arguments,
final
String adminUrl)
throws PulsarClientException.UnsupportedAuthenticationException {
@@ -196,6 +274,23 @@ public class PerfClientUtils {
}
}
+ /** {@link #closeClient(PulsarClient)} overload for the V5 client used by
the perf tools. */
+ public static void
closeClient(org.apache.pulsar.client.api.v5.PulsarClient client) {
+ if (client == null) {
+ return;
+ }
+ boolean wasInterrupted = Thread.currentThread().interrupted();
+ try {
+ client.close();
+ } catch (org.apache.pulsar.client.api.v5.PulsarClientException e) {
+ log.error().exception(e).log("Failed to close client");
+ } finally {
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
/**
* Check if the throwable or any of its causes is an InterruptedException.
*
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index d699da466af..cc4f2106d4c 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -22,13 +22,16 @@ import static
org.apache.commons.lang3.StringUtils.isNotBlank;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.RateLimiter;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.FileOutputStream;
import java.io.PrintStream;
-import java.nio.ByteBuffer;
-import java.text.DecimalFormat;
+import java.nio.file.Path;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -39,18 +42,17 @@ import lombok.CustomLog;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogWriter;
import org.HdrHistogram.Recorder;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.client.impl.ConsumerBase;
-import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.common.naming.TopicName;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
@@ -58,9 +60,23 @@ import picocli.CommandLine.Option;
@Command(name = "consume", description = "Test pulsar consumer performance.")
@CustomLog
public class PerformanceConsumer extends PerformanceTopicListArguments{
+
+ /**
+ * Subscription type flag values. V5 has no single user-facing
SubscriptionType enum
+ * (StreamConsumer / QueueConsumer / CheckpointConsumer are separate
APIs); we accept
+ * the v4 names for back-compat and map them all to {@link QueueConsumer},
which gives
+ * Shared work-distribution semantics. {@code Exclusive} / {@code
Failover} log a
+ * warning at run time — they are not exactly emulated.
+ */
+ public enum SubscriptionType {
+ Exclusive,
+ Shared,
+ Failover,
+ Key_Shared
+ }
+
private static final LongAdder messagesReceived = new LongAdder();
private static final LongAdder bytesReceived = new LongAdder();
- private static final DecimalFormat dec = new DecimalFormat("0.000");
private static final LongAdder totalMessagesReceived = new LongAdder();
private static final LongAdder totalBytesReceived = new LongAdder();
@@ -102,7 +118,7 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
@Option(names = { "-sp", "--subscription-position" }, description =
"Subscription position")
- private SubscriptionInitialPosition subscriptionInitialPosition =
SubscriptionInitialPosition.Latest;
+ private SubscriptionInitialPosition subscriptionInitialPosition =
SubscriptionInitialPosition.LATEST;
@Option(names = { "-r", "--rate" }, description = "Simulate a slow message
consumer (rate in msg/s)")
public double rate = 0;
@@ -232,21 +248,47 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
log.info().attr("config", w.writeValueAsString(this)).log("Starting
Pulsar performance consumer with config");
- final Recorder qRecorder = this.autoScaledReceiverQueueSize
- ? new Recorder(this.receiverQueueSize, 5) : null;
+ if (this.subscriptionType == SubscriptionType.Exclusive
+ || this.subscriptionType == SubscriptionType.Failover) {
+ log.warn().attr("type", this.subscriptionType)
+ .log("V5 has no exclusive/failover subscription type.
Falling back to QueueConsumer "
+ + "(Shared-style work distribution).
Latency/throughput numbers may not be "
+ + "directly comparable with the v4 client.");
+ }
+ if (this.autoScaledReceiverQueueSize) {
+ log.warn("--auto-scaled-receiver-queue-size has no V5 equivalent
and will be ignored.");
+ }
+ if (this.batchIndexAck) {
+ log.warn("--batch-index-ack has no V5 equivalent and will be
ignored.");
+ }
+ if (!this.poolMessages) {
+ log.info("--pool-messages has no effect on V5 (pooled messages are
not exposed).");
+ }
+ if (this.maxPendingChunkedMessage > 0 ||
this.expireTimeOfIncompleteChunkedMessageMs > 0
+ || this.autoAckOldestChunkedMessageOnQueueFull) {
+ log.warn("Chunked-message specific knobs (--max_chunked_msg / "
+ + "--expire_time_incomplete_chunked_messages /
--auto_ack_chunk_q_full) "
+ + "have no V5 equivalents and will be ignored.");
+ }
+ if (this.maxTotalReceiverQueueSizeAcrossPartitions != 50000) {
+ log.info("--receiver-queue-size-across-partitions has no V5
equivalent and will be ignored.");
+ }
+
final RateLimiter limiter = this.rate > 0 ?
RateLimiter.create(this.rate) : null;
long startTime = System.nanoTime();
long testEndTime = startTime + (long) (this.testTime * 1e9);
- ClientBuilder clientBuilder =
PerfClientUtils.createClientBuilderFromArguments(this)
- .enableTransaction(this.isEnableTransaction);
-
+ PulsarClientBuilder clientBuilder =
PerfClientUtils.createV5ClientBuilderFromArguments(this);
+ if (this.isEnableTransaction) {
+ clientBuilder.transactionPolicy(TransactionPolicy.builder()
+ .timeout(Duration.ofSeconds(this.transactionTimeout))
+ .build());
+ }
PulsarClient pulsarClient = clientBuilder.build();
AtomicReference<Transaction> atomicReference;
if (this.isEnableTransaction) {
- atomicReference = new
AtomicReference<>(pulsarClient.newTransaction()
- .withTransactionTimeout(this.transactionTimeout,
TimeUnit.SECONDS).build().get());
+ atomicReference = new
AtomicReference<>(pulsarClient.newTransaction());
} else {
atomicReference = new AtomicReference<>(null);
}
@@ -254,173 +296,27 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
AtomicLong messageAckedCount = new AtomicLong();
Semaphore messageReceiveLimiter = new
Semaphore(this.numMessagesPerTransaction);
Thread thread = Thread.currentThread();
- MessageListener<ByteBuffer> listener = (consumer, msg) -> {
- if (this.testTime > 0) {
- if (System.nanoTime() > testEndTime) {
- log.info("------------------- DONE
-----------------------");
- PerfClientUtils.exit(0);
- thread.interrupt();
- }
- }
- if (this.totalNumTxn > 0) {
- if (totalEndTxnOpFailNum.sum() + totalEndTxnOpSuccessNum.sum()
>= this.totalNumTxn) {
- log.info("------------------- DONE
-----------------------");
- PerfClientUtils.exit(0);
- thread.interrupt();
- }
- }
- if (qRecorder != null) {
- qRecorder.recordValue(((ConsumerBase<?>)
consumer).getTotalIncomingMessages());
- }
- messagesReceived.increment();
- bytesReceived.add(msg.size());
-
- totalMessagesReceived.increment();
- totalBytesReceived.add(msg.size());
-
- if (this.numMessages > 0 && totalMessagesReceived.sum() >=
this.numMessages) {
- log.info("------------------- DONE -----------------------");
- PerfClientUtils.exit(0);
- thread.interrupt();
- }
-
- if (limiter != null) {
- limiter.acquire();
- }
-
- long latencyMillis = System.currentTimeMillis() -
msg.getPublishTime();
- if (latencyMillis >= 0) {
- if (latencyMillis >= MAX_LATENCY) {
- latencyMillis = MAX_LATENCY;
- }
- recorder.recordValue(latencyMillis);
- cumulativeRecorder.recordValue(latencyMillis);
- }
- if (this.isEnableTransaction) {
- try {
- messageReceiveLimiter.acquire();
- } catch (InterruptedException e){
- log.error().exception(e).log("Got error");
- Thread.currentThread().interrupt();
- }
- consumer.acknowledgeAsync(msg.getMessageId(),
atomicReference.get()).thenRun(() -> {
- totalMessageAck.increment();
- messageAck.increment();
- }).exceptionally(throwable ->{
- log.error().attr("message",
msg).exception(throwable).log("Ack message failed with exception");
- totalMessageAckFailed.increment();
- if (PerfClientUtils.hasInterruptedException(throwable)) {
- Thread.currentThread().interrupt();
- }
- return null;
- });
- } else {
- consumer.acknowledgeAsync(msg).thenRun(()->{
- totalMessageAck.increment();
- messageAck.increment();
- }
- ).exceptionally(throwable ->{
- if
(PerfClientUtils.hasInterruptedException(throwable)) {
- Thread.currentThread().interrupt();
- return null;
- }
- log.error()
- .attr("message", msg)
- .exception(throwable)
- .log("Ack message failed with exception");
- totalMessageAckFailed.increment();
- return null;
- }
- );
- }
- if (this.poolMessages) {
- msg.release();
- }
- if (this.isEnableTransaction
- && messageAckedCount.incrementAndGet() ==
this.numMessagesPerTransaction) {
- Transaction transaction = atomicReference.get();
- if (!this.isAbortTransaction) {
- transaction.commit()
- .thenRun(() -> {
- log.debug().attr("transaction",
transaction.getTxnID()).log("Commit transaction");
- totalEndTxnOpSuccessNum.increment();
- numTxnOpSuccess.increment();
- })
- .exceptionally(exception -> {
- if
(PerfClientUtils.hasInterruptedException(exception)) {
- Thread.currentThread().interrupt();
- return null;
- }
- log.error().exception(exception).log("Commit
transaction failed with exception");
- totalEndTxnOpFailNum.increment();
- return null;
- });
- } else {
- transaction.abort().thenRun(() -> {
- log.debug().attr("transaction",
transaction.getTxnID()).log("Abort transaction");
- totalEndTxnOpSuccessNum.increment();
- numTxnOpSuccess.increment();
- }).exceptionally(exception -> {
- if
(PerfClientUtils.hasInterruptedException(exception)) {
- Thread.currentThread().interrupt();
- return null;
- }
- log.error()
- .attr("transaction",
transaction.getTxnID().toString())
- .exception(exception)
- .log("Abort transaction failed with
exception");
- totalEndTxnOpFailNum.increment();
- return null;
- });
- }
- while (!Thread.currentThread().isInterrupted()) {
- try {
- Transaction newTransaction =
pulsarClient.newTransaction()
-
.withTransactionTimeout(this.transactionTimeout, TimeUnit.SECONDS)
- .build().get();
- atomicReference.compareAndSet(transaction,
newTransaction);
- totalNumTxnOpenSuccess.increment();
- messageAckedCount.set(0);
-
messageReceiveLimiter.release(this.numMessagesPerTransaction);
- break;
- } catch (Exception e) {
- if (PerfClientUtils.hasInterruptedException(e)) {
- Thread.currentThread().interrupt();
- } else {
- log.error().exception(e).log("Failed to new
transaction with exception");
- totalNumTxnOpenFail.increment();
- }
- }
- }
- }
- };
-
- List<Future<Consumer<ByteBuffer>>> futures = new ArrayList<>();
- ConsumerBuilder<ByteBuffer> consumerBuilder =
pulsarClient.newConsumer(Schema.BYTEBUFFER) //
- .messageListener(listener) //
- .receiverQueueSize(this.receiverQueueSize) //
-
.maxTotalReceiverQueueSizeAcrossPartitions(this.maxTotalReceiverQueueSizeAcrossPartitions)
-
.acknowledgmentGroupTime(this.acknowledgmentsGroupingDelayMillis,
TimeUnit.MILLISECONDS) //
- .subscriptionType(this.subscriptionType)
- .subscriptionInitialPosition(this.subscriptionInitialPosition)
-
.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull)
- .enableBatchIndexAcknowledgment(this.batchIndexAck)
- .poolMessages(this.poolMessages)
- .replicateSubscriptionState(this.replicatedSubscription)
-
.autoScaledReceiverQueueSizeEnabled(this.autoScaledReceiverQueueSize);
- if (this.maxPendingChunkedMessage > 0) {
-
consumerBuilder.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
- }
- if (this.expireTimeOfIncompleteChunkedMessageMs > 0) {
-
consumerBuilder.expireTimeOfIncompleteChunkedMessage(this.expireTimeOfIncompleteChunkedMessageMs,
- TimeUnit.MILLISECONDS);
- }
+ QueueConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newQueueConsumer(Schema.bytes())
+ .receiverQueueSize(this.receiverQueueSize)
+
.acknowledgmentGroupTime(Duration.ofMillis(this.acknowledgmentsGroupingDelayMillis))
+ .subscriptionInitialPosition(this.subscriptionInitialPosition);
if (isNotBlank(this.encKeyFile)) {
- consumerBuilder.defaultCryptoKeyReader(this.encKeyFile);
+ // We do not know the key name from --encryption-key-value-file
alone; PemFileKeyProvider
+ // expects a name → path mapping. The encryption test path uses
subscribers that name keys
+ // explicitly; here we register the file under the same name the
producer side used
+ // (defaults to the file path's last component if unset upstream).
+ String keyName = Path.of(this.encKeyFile).getFileName().toString();
+ PemFileKeyProvider keys = PemFileKeyProvider.builder()
+ .privateKey(keyName, Path.of(this.encKeyFile))
+ .build();
+ consumerBuilder.encryptionPolicy(ConsumerEncryptionPolicy.builder()
+ .privateKeyProvider(keys)
+ .build());
}
+ List<Future<QueueConsumer<byte[]>>> futures = new ArrayList<>();
for (int i = 0; i < this.numTopics; i++) {
final TopicName topicName = TopicName.get(this.topics.get(i));
@@ -432,13 +328,40 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
for (int j = 0; j < this.numSubscriptions; j++) {
String subscriberName = this.subscriptions.get(j);
for (int k = 0; k < this.numConsumers; k++) {
-
futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName)
- .subscribeAsync());
+ // V5 QueueConsumerBuilder has no clone(); build
per-consumer to set topic+sub.
+ QueueConsumerBuilder<byte[]> b =
pulsarClient.newQueueConsumer(Schema.bytes())
+ .receiverQueueSize(this.receiverQueueSize)
+
.acknowledgmentGroupTime(Duration.ofMillis(this.acknowledgmentsGroupingDelayMillis))
+
.subscriptionInitialPosition(this.subscriptionInitialPosition)
+
.replicateSubscriptionState(this.replicatedSubscription)
+ .topic(topicName.toString())
+ .subscriptionName(subscriberName);
+ if (isNotBlank(this.encKeyFile)) {
+ String keyName =
Path.of(this.encKeyFile).getFileName().toString();
+ PemFileKeyProvider keys = PemFileKeyProvider.builder()
+ .privateKey(keyName, Path.of(this.encKeyFile))
+ .build();
+ b.encryptionPolicy(ConsumerEncryptionPolicy.builder()
+ .privateKeyProvider(keys)
+ .build());
+ }
+ futures.add(b.subscribeAsync());
}
}
}
- for (Future<Consumer<ByteBuffer>> future : futures) {
- future.get();
+ final List<QueueConsumer<byte[]>> consumers = new
ArrayList<>(futures.size());
+ for (Future<QueueConsumer<byte[]>> future : futures) {
+ consumers.add(future.get());
+ }
+
+ // V5 has no MessageListener — drive each consumer from a dedicated
poll thread that calls
+ // receive(timeout) and runs the same per-message handler the v4
listener did. One thread
+ // per consumer mirrors the v4 dispatch concurrency closely enough for
the perf workload.
+ ExecutorService consumerExec = Executors.newCachedThreadPool(
+ new DefaultThreadFactory("pulsar-perf-consumer-poll"));
+ for (QueueConsumer<byte[]> consumer : consumers) {
+ consumerExec.submit(() -> pollLoop(consumer, atomicReference,
messageAckedCount,
+ messageReceiveLimiter, limiter, testEndTime, thread,
pulsarClient));
}
log.info()
.attr("receiving", this.numConsumers)
@@ -455,7 +378,6 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
long oldTime = System.nanoTime();
Histogram reportHistogram = null;
- Histogram qHistogram = null;
HistogramLogWriter histogramLogWriter = null;
if (this.histogramFile != null) {
@@ -511,34 +433,6 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
reportHistogram.getValueAtPercentile(99.99),
reportHistogram.getMaxValue());
- if (this.autoScaledReceiverQueueSize && qRecorder != null) {
- qHistogram = qRecorder.getIntervalHistogram(qHistogram);
- log.debug()
- .attr("cnt", qHistogram.getTotalCount())
- .attr("mean", dec.format(qHistogram.getMean()))
- .attr("min", qHistogram.getMinValue())
- .attr("max", qHistogram.getMaxValue())
- .attr("pct", qHistogram.getValueAtPercentile(25))
- .attr("pct2", qHistogram.getValueAtPercentile(50))
- .attr("pct3", qHistogram.getValueAtPercentile(75))
- .log("ReceiverQueueUsage: cnt= ,mean= , min= ,max=
,25pct= ,50pct= ,75pct");
- qHistogram.reset();
- for (Future<Consumer<ByteBuffer>> future : futures) {
- ConsumerBase<?> consumerBase = (ConsumerBase<?>)
future.get();
- log.debug()
- .attr("consumerName",
consumerBase.getConsumerName())
- .attr("currentReceiverQueueSize",
consumerBase.getCurrentReceiverQueueSize())
- .log("CurrentReceiverQueueSize");
- if (consumerBase instanceof MultiTopicsConsumerImpl) {
- for (ConsumerImpl<?> consumer :
((MultiTopicsConsumerImpl<?>) consumerBase).getConsumers()) {
- log.debug()
- .attr("consumerName",
consumer.getConsumerName())
- .attr("currentReceiverQueueSize",
consumer.getCurrentReceiverQueueSize())
-
.log("SubConsumer.CurrentReceiverQueueSize");
- }
- }
- }
- }
if (histogramLogWriter != null) {
histogramLogWriter.outputIntervalHistogram(reportHistogram);
}
@@ -554,10 +448,181 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
}
}
}
+ // Stop the poll threads before closing the client so receive() does
not race with close.
+ consumerExec.shutdownNow();
+ try {
+ if (!consumerExec.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.warn("Consumer poll executor did not terminate within
timeout");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
PerfClientUtils.closeClient(pulsarClient);
PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
}
+ /**
+ * Per-consumer poll loop replacing the v4 {@code MessageListener}. Each
consumer gets one
+ * dedicated thread that drives {@code receive(timeout)} and runs the same
per-message
+ * handler the v4 listener did (latency record, rate-limit, ack,
transaction commit/rollover).
+ */
+ private void pollLoop(QueueConsumer<byte[]> consumer,
+ AtomicReference<Transaction> atomicReference,
+ AtomicLong messageAckedCount,
+ Semaphore messageReceiveLimiter,
+ RateLimiter limiter,
+ long testEndTime,
+ Thread mainThread,
+ PulsarClient pulsarClient) {
+ while (!Thread.currentThread().isInterrupted()) {
+ Message<byte[]> msg;
+ try {
+ msg = consumer.receive(Duration.ofSeconds(1));
+ } catch (Exception e) {
+ if (PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ log.warn().exception(e).log("receive failed; retrying");
+ continue;
+ }
+ if (msg == null) {
+ continue;
+ }
+
+ if (this.testTime > 0 && System.nanoTime() > testEndTime) {
+ log.info("------------------- DONE -----------------------");
+ PerfClientUtils.exit(0);
+ mainThread.interrupt();
+ return;
+ }
+ if (this.totalNumTxn > 0
+ && totalEndTxnOpFailNum.sum() +
totalEndTxnOpSuccessNum.sum() >= this.totalNumTxn) {
+ log.info("------------------- DONE -----------------------");
+ PerfClientUtils.exit(0);
+ mainThread.interrupt();
+ return;
+ }
+ messagesReceived.increment();
+ bytesReceived.add(msg.size());
+ totalMessagesReceived.increment();
+ totalBytesReceived.add(msg.size());
+
+ if (this.numMessages > 0 && totalMessagesReceived.sum() >=
this.numMessages) {
+ log.info("------------------- DONE -----------------------");
+ PerfClientUtils.exit(0);
+ mainThread.interrupt();
+ return;
+ }
+
+ if (limiter != null) {
+ limiter.acquire();
+ }
+
+ long latencyMillis = System.currentTimeMillis() -
msg.publishTime().toEpochMilli();
+ if (latencyMillis >= 0) {
+ if (latencyMillis >= MAX_LATENCY) {
+ latencyMillis = MAX_LATENCY;
+ }
+ recorder.recordValue(latencyMillis);
+ cumulativeRecorder.recordValue(latencyMillis);
+ }
+
+ // Ack — V5 acknowledge is synchronous void. Catch any failure
into the existing counter.
+ if (this.isEnableTransaction) {
+ try {
+ messageReceiveLimiter.acquire();
+ } catch (InterruptedException e) {
+ log.error().exception(e).log("Got error");
+ Thread.currentThread().interrupt();
+ }
+ Transaction txn = atomicReference.get();
+ try {
+ consumer.acknowledge(msg.id(), txn);
+ totalMessageAck.increment();
+ messageAck.increment();
+ } catch (Exception e) {
+ if (PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ } else {
+ log.error().exception(e).log("Ack message failed with
exception");
+ totalMessageAckFailed.increment();
+ }
+ }
+ } else {
+ try {
+ consumer.acknowledge(msg.id());
+ totalMessageAck.increment();
+ messageAck.increment();
+ } catch (Exception e) {
+ if (PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ } else {
+ log.error().exception(e).log("Ack message failed with
exception");
+ totalMessageAckFailed.increment();
+ }
+ }
+ }
+
+ // Transaction commit / rollover after numMessagesPerTransaction
acks.
+ if (this.isEnableTransaction
+ && messageAckedCount.incrementAndGet() ==
this.numMessagesPerTransaction) {
+ Transaction transaction = atomicReference.get();
+ if (!this.isAbortTransaction) {
+ transaction.async().commit()
+ .thenRun(() -> {
+ log.debug().log("Commit transaction");
+ totalEndTxnOpSuccessNum.increment();
+ numTxnOpSuccess.increment();
+ })
+ .exceptionally(exception -> {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ log.error().exception(exception).log("Commit
transaction failed with exception");
+ totalEndTxnOpFailNum.increment();
+ return null;
+ });
+ } else {
+ transaction.async().abort()
+ .thenRun(() -> {
+ log.debug().log("Abort transaction");
+ totalEndTxnOpSuccessNum.increment();
+ numTxnOpSuccess.increment();
+ })
+ .exceptionally(exception -> {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ log.error().exception(exception)
+ .log("Abort transaction failed with
exception");
+ totalEndTxnOpFailNum.increment();
+ return null;
+ });
+ }
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Transaction newTransaction =
pulsarClient.newTransaction();
+ atomicReference.compareAndSet(transaction,
newTransaction);
+ totalNumTxnOpenSuccess.increment();
+ messageAckedCount.set(0);
+
messageReceiveLimiter.release(this.numMessagesPerTransaction);
+ break;
+ } catch (Exception e) {
+ if (PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ } else {
+ log.error().exception(e).log("Failed to new
transaction with exception");
+ totalNumTxnOpenFail.increment();
+ }
+ }
+ }
+ }
+ }
+ }
+
private void printAggregatedThroughput(long start) {
double elapsed = (System.nanoTime() - start) / 1e9;
double rate = totalMessagesReceived.sum() / elapsed;
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 05ad9f6254d..2978456b8b2 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -35,6 +35,8 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -56,16 +58,24 @@ import org.HdrHistogram.Recorder;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerAccessMode;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.ProducerBuilder;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.async.AsyncMessageBuilder;
+import org.apache.pulsar.client.api.v5.async.AsyncProducer;
+import org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider;
+import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
+import org.apache.pulsar.client.api.v5.config.ChunkingPolicy;
+import org.apache.pulsar.client.api.v5.config.CompressionPolicy;
+import org.apache.pulsar.client.api.v5.config.CompressionType;
+import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.api.v5.config.ProducerAccessMode;
+import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import picocli.CommandLine.Command;
@@ -215,7 +225,7 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
public String messageKeyGenerationMode = null;
@Option(names = { "-am", "--access-mode" }, description = "Producer access
mode")
- public ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared;
+ public ProducerAccessMode producerAccessMode = ProducerAccessMode.SHARED;
@Option(names = { "-fp", "--format-payload" },
description = "Format %%i as a message index in the stream from
producer and/or %%t as the timestamp"
@@ -463,43 +473,51 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
}
}
- @SuppressWarnings("deprecation")
- ProducerBuilder<byte[]> createProducerBuilder(PulsarClient client, int
producerId) {
- ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
- .sendTimeout(this.sendTimeout, TimeUnit.SECONDS) //
- .compressionType(this.compression) //
- .maxPendingMessages(this.maxOutstanding) //
+ ProducerBuilder<byte[]> createProducerBuilder(PulsarClient client, int
producerId, String topic) {
+ ProducerBuilder<byte[]> producerBuilder =
client.newProducer(Schema.bytes())
+ .topic(topic)
+ .sendTimeout(Duration.ofSeconds(this.sendTimeout))
+ .compressionPolicy(CompressionPolicy.of(this.compression))
.accessMode(this.producerAccessMode)
- // enable round robin message routing if it is a partitioned
topic
- .messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
- if (this.maxPendingMessagesAcrossPartitions > 0) {
-
producerBuilder.maxPendingMessagesAcrossPartitions(this.maxPendingMessagesAcrossPartitions);
- }
+ .blockIfQueueFull(true);
+
+ // V5 does not expose maxPendingMessages /
maxPendingMessagesAcrossPartitions /
+ // messageRoutingMode as user-configurable knobs; the SDK manages
memory via the
+ // client-level MemorySize policy and routes appropriately for regular
and scalable
+ // topics. The legacy --max-outstanding /
--max-outstanding-across-partitions flags
+ // are accepted for back-compat but have no effect on the V5 client.
if (this.producerName != null) {
- String producerName = String.format("%s%s%d", this.producerName,
this.separator, producerId);
- producerBuilder.producerName(producerName);
+ producerBuilder.producerName(String.format("%s%s%d",
this.producerName, this.separator, producerId));
}
- if (this.disableBatching || (this.batchTimeMillis <= 0.0 &&
this.batchMaxMessages <= 0)) {
- producerBuilder.enableBatching(false);
+ // Batching and chunking are mutually exclusive. Chunking wins when
both are requested.
+ if (this.chunkingAllowed) {
+
producerBuilder.chunkingPolicy(ChunkingPolicy.builder().enabled(true).build());
+ producerBuilder.batchingPolicy(BatchingPolicy.ofDisabled());
+ } else if (this.disableBatching || (this.batchTimeMillis <= 0.0 &&
this.batchMaxMessages <= 0)) {
+ producerBuilder.batchingPolicy(BatchingPolicy.ofDisabled());
} else {
- long batchTimeUsec = (long) (this.batchTimeMillis * 1000);
- producerBuilder.batchingMaxPublishDelay(batchTimeUsec,
TimeUnit.MICROSECONDS).enableBatching(true);
- }
- if (this.batchMaxMessages > 0) {
- producerBuilder.batchingMaxMessages(this.batchMaxMessages);
- }
- if (this.batchMaxBytes > 0) {
- producerBuilder.batchingMaxBytes(this.batchMaxBytes);
+ BatchingPolicy.Builder batching = BatchingPolicy.builder()
+ .enabled(true)
+ .maxPublishDelay(Duration.ofNanos((long)
(this.batchTimeMillis * 1_000_000)));
+ if (this.batchMaxMessages > 0) {
+ batching.maxMessages(this.batchMaxMessages);
+ }
+ if (this.batchMaxBytes > 0) {
+ batching.maxSize(MemorySize.ofBytes(this.batchMaxBytes));
+ }
+ producerBuilder.batchingPolicy(batching.build());
}
- // Block if queue is full else we will start seeing errors in sendAsync
- producerBuilder.blockIfQueueFull(true);
-
if (isNotBlank(this.encKeyName) && isNotBlank(this.encKeyFile)) {
- producerBuilder.addEncryptionKey(this.encKeyName);
- producerBuilder.defaultCryptoKeyReader(this.encKeyFile);
+ PemFileKeyProvider keyProvider = PemFileKeyProvider.builder()
+ .publicKey(this.encKeyName,
java.nio.file.Path.of(this.encKeyFile))
+ .build();
+ producerBuilder.encryptionPolicy(ProducerEncryptionPolicy.builder()
+ .publicKeyProvider(keyProvider)
+ .keyName(this.encKeyName)
+ .build());
}
return producerBuilder;
@@ -515,24 +533,19 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
PulsarClient client = null;
boolean produceEnough = false;
try {
- // Now processing command line arguments
List<Future<Producer<byte[]>>> futures = new ArrayList<>();
-
- ClientBuilder clientBuilder =
PerfClientUtils.createClientBuilderFromArguments(arguments)
- .enableTransaction(this.isEnableTransaction);
-
+ PulsarClientBuilder clientBuilder =
PerfClientUtils.createV5ClientBuilderFromArguments(arguments);
+ if (this.isEnableTransaction) {
+ clientBuilder.transactionPolicy(TransactionPolicy.builder()
+ .timeout(Duration.ofSeconds(this.transactionTimeout))
+ .build());
+ }
client = clientBuilder.build();
- ProducerBuilder<byte[]> producerBuilder =
createProducerBuilder(client, producerId);
-
AtomicReference<Transaction> transactionAtomicReference;
if (this.isEnableTransaction) {
- producerBuilder.sendTimeout(0, TimeUnit.SECONDS);
- transactionAtomicReference = new
AtomicReference<>(client.newTransaction()
- .withTransactionTimeout(this.transactionTimeout,
TimeUnit.SECONDS)
- .build()
- .get());
+ transactionAtomicReference = new
AtomicReference<>(client.newTransaction());
} else {
transactionAtomicReference = new AtomicReference<>(null);
}
@@ -543,11 +556,7 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
log.info().attr("adding", this.numProducers).attr("topic",
topic).log("Adding publishers on topic");
for (int j = 0; j < this.numProducers; j++) {
- ProducerBuilder<byte[]> prodBuilder =
producerBuilder.clone().topic(topic);
- if (this.chunkingAllowed) {
- prodBuilder.enableChunking(true);
- prodBuilder.enableBatching(false);
- }
+ ProducerBuilder<byte[]> prodBuilder =
createProducerBuilder(client, producerId, topic);
futures.add(prodBuilder.createAsync());
}
}
@@ -557,6 +566,10 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
producers.add(future.get());
}
Collections.shuffle(producers);
+ final List<AsyncProducer<byte[]>> asyncProducers = new
ArrayList<>(producers.size());
+ for (Producer<byte[]> p : producers) {
+ asyncProducers.add(p.async());
+ }
log.info().attr("created", producers.size()).log("Created
producers");
@@ -577,11 +590,16 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
AtomicLong totalSent = new AtomicLong(0);
AtomicLong numMessageSend = new AtomicLong(0);
Semaphore numMsgPerTxnLimit = new
Semaphore(this.numMessagesPerTransaction);
+ // Send futures of the in-flight transaction. V5 transaction-aware
sends are queued
+ // onto an internal dispatch chain, so the v4-side txn-coordinator
registration can
+ // lag the local counter; we await these before committing so
commit never races
+ // ahead of the sends (otherwise the broker rejects with
InvalidTxnStatusException).
+ final List<java.util.concurrent.CompletableFuture<?>>
pendingTxnSends = new ArrayList<>();
while (!Thread.currentThread().isInterrupted()) {
if (produceEnough) {
break;
}
- for (Producer<byte[]> producer : producers) {
+ for (AsyncProducer<byte[]> producer : asyncProducers) {
if (this.testTime > 0) {
if (System.nanoTime() > testEndTime) {
log.info()
@@ -622,7 +640,7 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
} else {
payloadData = payloadBytes;
}
- TypedMessageBuilder<byte[]> messageBuilder;
+ AsyncMessageBuilder<byte[]> messageBuilder =
producer.newMessage().value(payloadData);
if (this.isEnableTransaction) {
if (this.numMessagesPerTransaction > 0) {
try {
@@ -632,21 +650,17 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
Thread.currentThread().interrupt();
}
}
- messageBuilder = producer.newMessage(transaction)
- .value(payloadData);
- } else {
- messageBuilder = producer.newMessage()
- .value(payloadData);
+ messageBuilder.transaction(transaction);
}
if (this.delay > 0) {
- messageBuilder.deliverAfter(this.delay,
TimeUnit.SECONDS);
+
messageBuilder.deliverAfter(Duration.ofSeconds(this.delay));
} else if (this.delayRange != null) {
final long deliverAfter = ThreadLocalRandom.current()
.nextLong(this.delayRange.lowerEndpoint(),
this.delayRange.upperEndpoint());
- messageBuilder.deliverAfter(deliverAfter,
TimeUnit.SECONDS);
+
messageBuilder.deliverAfter(Duration.ofSeconds(deliverAfter));
}
if (this.setEventTime) {
- messageBuilder.eventTime(System.currentTimeMillis());
+ messageBuilder.eventTime(Instant.now());
}
//generate msg key
if (msgKeyMode == MessageKeyGenerationMode.random) {
@@ -655,7 +669,7 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
messageBuilder.key(String.valueOf(totalSent.get()));
}
PulsarClient pulsarClient = client;
- messageBuilder.sendAsync().thenRun(() -> {
+ var sendFuture = messageBuilder.send().thenRun(() -> {
bytesSent.add(payloadData.length);
messagesSent.increment();
totalSent.incrementAndGet();
@@ -690,14 +704,28 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
}
return null;
});
+ if (this.isEnableTransaction) {
+ pendingTxnSends.add(sendFuture);
+ }
if (this.isEnableTransaction
&& numMessageSend.incrementAndGet() ==
this.numMessagesPerTransaction) {
+ // Await all sends issued under this transaction
before committing, so the
+ // txn coordinator has registered every send. The
chain above already
+ // swallows per-send failures, so this join never
throws on a send error.
+ try {
+ java.util.concurrent.CompletableFuture.allOf(
+ pendingTxnSends.toArray(new
java.util.concurrent.CompletableFuture[0]))
+ .join();
+ } catch (Exception awaitEx) {
+ if
(PerfClientUtils.hasInterruptedException(awaitEx)) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ pendingTxnSends.clear();
if (!this.isAbortTransaction) {
- transaction.commit()
+ transaction.async().commit()
.thenRun(() -> {
- log.debug()
- .attr("transaction",
transaction.getTxnID().toString())
- .log("Committed transaction");
+ log.debug().log("Committed
transaction");
totalEndTxnOpSuccessNum.increment();
numTxnOpSuccess.increment();
})
@@ -713,10 +741,8 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
return null;
});
} else {
- transaction.abort().thenRun(() -> {
- log.debug()
- .attr("transaction",
transaction.getTxnID().toString())
- .log("Abort transaction");
+ transaction.async().abort().thenRun(() -> {
+ log.debug().log("Abort transaction");
totalEndTxnOpSuccessNum.increment();
numTxnOpSuccess.increment();
}).exceptionally(exception -> {
@@ -725,7 +751,6 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
return null;
}
log.error()
- .attr("transaction",
transaction.getTxnID().toString())
.exception(exception)
.log("Abort transaction failed with
exception");
totalEndTxnOpFailNum.increment();
@@ -734,9 +759,7 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
}
while (!Thread.currentThread().isInterrupted()) {
try {
- Transaction newTransaction =
pulsarClient.newTransaction()
-
.withTransactionTimeout(this.transactionTimeout,
-
TimeUnit.SECONDS).build().get();
+ Transaction newTransaction =
pulsarClient.newTransaction();
transactionAtomicReference.compareAndSet(transaction, newTransaction);
numMessageSend.set(0);
numMsgPerTxnLimit.release(this.numMessagesPerTransaction);
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 55aa1f345dc..a443359be7a 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -22,23 +22,26 @@ import static
org.apache.pulsar.testclient.PerfClientUtils.addShutdownHook;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.RateLimiter;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import lombok.CustomLog;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.client.api.ReaderListener;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.api.v5.Checkpoint;
+import org.apache.pulsar.client.api.v5.CheckpointConsumer;
+import org.apache.pulsar.client.api.v5.CheckpointConsumerBuilder;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import picocli.CommandLine.Command;
@@ -86,16 +89,16 @@ public class PerformanceReader extends
PerformanceTopicListArguments {
@Override
public void validate() throws Exception {
super.validate();
- if (startMessageId != "earliest" && startMessageId != "latest"
- && (startMessageId.split(":")).length != 2) {
- String errMsg = String.format("invalid start message ID '%s', must
be either either 'earliest', "
- + "'latest' or a specific message id by using 'lid:eid'",
startMessageId);
- throw new Exception(errMsg);
+ // V5 CheckpointConsumer accepts earliest / latest / a serialized
Checkpoint byte array.
+ // It does not expose the v4 "lid:eid" specific MessageId form, so
reject it explicitly.
+ if (!"earliest".equals(startMessageId) &&
!"latest".equals(startMessageId)) {
+ throw new Exception(String.format("invalid start message ID '%s'.
V5 CheckpointConsumer "
+ + "only accepts 'earliest' or 'latest'; the v4 'lid:eid'
form is not supported.",
+ startMessageId));
}
}
@Override
- @SuppressWarnings("deprecation")
public void run() throws Exception {
// Dump config variables
PerfClientUtils.printJVMInformation(log);
@@ -103,59 +106,45 @@ public class PerformanceReader extends
PerformanceTopicListArguments {
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
log.info().attr("config", w.writeValueAsString(this)).log("Starting
Pulsar performance reader with config");
- final RateLimiter limiter = this.rate > 0 ?
RateLimiter.create(this.rate) : null;
- ReaderListener<byte[]> listener = (reader, msg) -> {
- messagesReceived.increment();
- bytesReceived.add(msg.getData().length);
-
- totalMessagesReceived.increment();
- totalBytesReceived.add(msg.getData().length);
-
- if (this.numMessages > 0 && totalMessagesReceived.sum() >=
this.numMessages) {
- log.info().attr("number", this.numMessages).log("DONE (reached
the maximum number: of consumption");
- PerfClientUtils.exit(0);
- }
-
- if (limiter != null) {
- limiter.acquire();
- }
-
- long latencyMillis = System.currentTimeMillis() -
msg.getPublishTime();
- if (latencyMillis >= 0) {
- recorder.recordValue(latencyMillis);
- cumulativeRecorder.recordValue(latencyMillis);
- }
- };
-
- ClientBuilder clientBuilder =
PerfClientUtils.createClientBuilderFromArguments(this)
- .enableTls(this.useTls);
+ if (this.useTls) {
+ log.info("--use-tls has no effect on V5 (TLS is enabled
automatically when the service URL "
+ + "uses pulsar+ssl:// — pass that scheme via --service-url
instead).");
+ }
+ if (this.receiverQueueSize != 1000) {
+ log.info("--receiver-queue-size has no effect on V5
CheckpointConsumer.");
+ }
- PulsarClient pulsarClient = clientBuilder.build();
+ final RateLimiter limiter = this.rate > 0 ?
RateLimiter.create(this.rate) : null;
- List<CompletableFuture<Reader<byte[]>>> futures = new ArrayList<>();
+ PulsarClient pulsarClient =
PerfClientUtils.createV5ClientBuilderFromArguments(this).build();
- MessageId startMessageId;
- if ("earliest".equals(this.startMessageId)) {
- startMessageId = MessageId.earliest;
- } else if ("latest".equals(this.startMessageId)) {
- startMessageId = MessageId.latest;
- } else {
- String[] parts = this.startMessageId.split(":");
- startMessageId = new MessageIdImpl(Long.parseLong(parts[0]),
Long.parseLong(parts[1]), -1);
- }
+ List<CompletableFuture<CheckpointConsumer<byte[]>>> futures = new
ArrayList<>();
- ReaderBuilder<byte[]> readerBuilder = pulsarClient.newReader() //
- .readerListener(listener) //
- .receiverQueueSize(this.receiverQueueSize) //
- .startMessageId(startMessageId);
+ Checkpoint startPosition = "earliest".equals(this.startMessageId)
+ ? Checkpoint.earliest()
+ : Checkpoint.latest();
for (int i = 0; i < this.numTopics; i++) {
final TopicName topicName = TopicName.get(this.topics.get(i));
-
-
futures.add(readerBuilder.clone().topic(topicName.toString()).createAsync());
+ CheckpointConsumerBuilder<byte[]> b =
pulsarClient.newCheckpointConsumer(Schema.bytes())
+ .topic(topicName.toString())
+ .startPosition(startPosition);
+ futures.add(b.createAsync());
}
FutureUtil.waitForAll(futures).get();
+ final List<CheckpointConsumer<byte[]>> consumers = new
ArrayList<>(futures.size());
+ for (CompletableFuture<CheckpointConsumer<byte[]>> future : futures) {
+ consumers.add(future.get());
+ }
+
+ // V5 has no ReaderListener — drive each consumer from a dedicated
poll thread that calls
+ // receive(timeout) and runs the same per-message handler the v4
listener did.
+ ExecutorService readerExec = Executors.newCachedThreadPool(
+ new DefaultThreadFactory("pulsar-perf-reader-poll"));
+ for (CheckpointConsumer<byte[]> consumer : consumers) {
+ readerExec.submit(() -> readLoop(consumer, limiter));
+ }
log.info().attr("reading", this.numTopics).log("Start reading from
topics");
@@ -215,9 +204,64 @@ public class PerformanceReader extends
PerformanceTopicListArguments {
oldTime = now;
}
+ readerExec.shutdownNow();
+ try {
+ if (!readerExec.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.warn("Reader poll executor did not terminate within
timeout");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
PerfClientUtils.closeClient(pulsarClient);
PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
}
+
+ /**
+ * Per-consumer poll loop replacing the v4 {@code ReaderListener}. Drives
+ * {@code receive(timeout)} on the CheckpointConsumer and runs the same
per-message
+ * counters + latency record + rate-limit the v4 listener did.
+ */
+ private void readLoop(CheckpointConsumer<byte[]> consumer, RateLimiter
limiter) {
+ while (!Thread.currentThread().isInterrupted()) {
+ Message<byte[]> msg;
+ try {
+ msg = consumer.receive(Duration.ofSeconds(1));
+ } catch (Exception e) {
+ if (PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ log.warn().exception(e).log("receive failed; retrying");
+ continue;
+ }
+ if (msg == null) {
+ continue;
+ }
+
+ byte[] data = msg.value();
+ messagesReceived.increment();
+ bytesReceived.add(data.length);
+ totalMessagesReceived.increment();
+ totalBytesReceived.add(data.length);
+
+ if (this.numMessages > 0 && totalMessagesReceived.sum() >=
this.numMessages) {
+ log.info().attr("number", this.numMessages).log("DONE (reached
the maximum number: of consumption");
+ PerfClientUtils.exit(0);
+ return;
+ }
+
+ if (limiter != null) {
+ limiter.acquire();
+ }
+
+ long latencyMillis = System.currentTimeMillis() -
msg.publishTime().toEpochMilli();
+ if (latencyMillis >= 0) {
+ recorder.recordValue(latencyMillis);
+ cumulativeRecorder.recordValue(latencyMillis);
+ }
+ }
+ }
+
private static void printAggregatedThroughput(long start) {
double elapsed = (System.nanoTime() - start) / 1e9;
double rate = totalMessagesReceived.sum() / elapsed;
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 381c4548913..73c1f79497c 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.RateLimiter;
import java.io.FileOutputStream;
import java.io.PrintStream;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -46,18 +47,19 @@ import org.HdrHistogram.Recorder;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.ProducerBuilder;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.async.AsyncProducer;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
@@ -66,6 +68,15 @@ import picocli.CommandLine.Option;
@CustomLog
public class PerformanceTransaction extends PerformanceBaseArguments{
+ /** Same v4-compat SubscriptionType flag as PerformanceConsumer. See its
javadoc. */
+ public enum SubscriptionType {
+ Exclusive,
+ Shared,
+ Failover,
+ Key_Shared
+ }
+
+
private static final LongAdder totalNumEndTxnOpFailed = new LongAdder();
private static final LongAdder totalNumEndTxnOpSuccess = new LongAdder();
private static final LongAdder numTxnOpSuccess = new LongAdder();
@@ -122,7 +133,7 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
public int numSubscriptions = 1;
@Option(names = {"-sp", "--subscription-position"}, description =
"Subscription position")
- private SubscriptionInitialPosition subscriptionInitialPosition =
SubscriptionInitialPosition.Earliest;
+ private SubscriptionInitialPosition subscriptionInitialPosition =
SubscriptionInitialPosition.EARLIEST;
@Option(names = {"-st", "--subscription-type"}, description =
"Subscription type")
public SubscriptionType subscriptionType = SubscriptionType.Shared;
@@ -226,9 +237,19 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
}
}
- ClientBuilder clientBuilder =
PerfClientUtils.createClientBuilderFromArguments(this)
- .enableTransaction(!this.isDisableTransaction);
+ if (this.subscriptionType == SubscriptionType.Exclusive
+ || this.subscriptionType == SubscriptionType.Failover) {
+ log.warn().attr("type", this.subscriptionType)
+ .log("V5 has no exclusive/failover subscription type.
Falling back to QueueConsumer "
+ + "(Shared-style work distribution).");
+ }
+ PulsarClientBuilder clientBuilder =
PerfClientUtils.createV5ClientBuilderFromArguments(this);
+ if (!this.isDisableTransaction) {
+ clientBuilder.transactionPolicy(TransactionPolicy.builder()
+ .timeout(Duration.ofSeconds(this.transactionTimeout))
+ .build());
+ }
PulsarClient client = clientBuilder.build();
try {
@@ -261,16 +282,13 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
//responsible for the production and consumption tasks of
the transaction through the loop.
//A thread may perform tasks of multiple transactions in a
traversing manner.
List<Producer<byte[]>> producers = null;
- List<List<Consumer<byte[]>>> consumers = null;
+ List<List<QueueConsumer<byte[]>>> consumers = null;
AtomicReference<Transaction> atomicReference = null;
try {
producers = buildProducers(client);
consumers = buildConsumer(client);
if (!this.isDisableTransaction) {
- atomicReference = new
AtomicReference<>(client.newTransaction()
-
.withTransactionTimeout(this.transactionTimeout, TimeUnit.SECONDS)
- .build()
- .get());
+ atomicReference = new
AtomicReference<>(client.newTransaction());
} else {
atomicReference = new AtomicReference<>(null);
}
@@ -310,8 +328,8 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
}
}
Transaction transaction = atomicReference.get();
- for (List<Consumer<byte[]>> subscriptions : consumers)
{
- for (Consumer<byte[]> consumer : subscriptions) {
+ for (List<QueueConsumer<byte[]>> subscriptions :
consumers) {
+ for (QueueConsumer<byte[]> consumer :
subscriptions) {
for (int j = 0; j <
this.numMessagesReceivedPerTransaction; j++) {
Message<byte[]> message = null;
try {
@@ -322,102 +340,85 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
PerfClientUtils.exit(1);
}
long receiveTime = System.nanoTime();
- if (!this.isDisableTransaction) {
-
consumer.acknowledgeAsync(message.getMessageId(), transaction)
- .thenRun(() -> {
- long latencyMicros =
NANOSECONDS.toMicros(
- System.nanoTime()
- receiveTime);
-
messageAckRecorder.recordValue(latencyMicros);
-
messageAckCumulativeRecorder.recordValue(latencyMicros);
-
numMessagesAckSuccess.increment();
- }).exceptionally(exception -> {
- if
(PerfClientUtils.hasInterruptedException(exception)) {
-
Thread.currentThread().interrupt();
- return null;
- }
- log.error()
-
.attr("transaction", transaction)
-
.exception(exception)
- .log("Ack message
failed with transaction throw exception");
-
numMessagesAckFailed.increment();
- return null;
- });
- } else {
-
consumer.acknowledgeAsync(message).thenRun(() -> {
- long latencyMicros =
NANOSECONDS.toMicros(
- System.nanoTime() -
receiveTime);
-
messageAckRecorder.recordValue(latencyMicros);
-
messageAckCumulativeRecorder.recordValue(latencyMicros);
- numMessagesAckSuccess.increment();
- }).exceptionally(exception -> {
- if
(PerfClientUtils.hasInterruptedException(exception)) {
-
Thread.currentThread().interrupt();
- return null;
- }
+ // V5 acknowledge is synchronous void.
Record latency immediately
+ // and catch any failure into the existing
counter.
+ try {
+ if (!this.isDisableTransaction) {
+ consumer.acknowledge(message.id(),
transaction);
+ } else {
+ consumer.acknowledge(message.id());
+ }
+ long latencyMicros =
NANOSECONDS.toMicros(
+ System.nanoTime() -
receiveTime);
+
messageAckRecorder.recordValue(latencyMicros);
+
messageAckCumulativeRecorder.recordValue(latencyMicros);
+ numMessagesAckSuccess.increment();
+ } catch (Exception ackEx) {
+ if
(PerfClientUtils.hasInterruptedException(ackEx)) {
+ Thread.currentThread().interrupt();
+ } else {
log.error()
- .attr("transaction",
transaction)
- .exception(exception)
+ .exception(ackEx)
.log("Ack message failed
with transaction throw exception");
numMessagesAckFailed.increment();
- return null;
- });
+ }
}
}
}
}
+ // V5 transaction-aware sends are queued onto an
internal dispatch chain,
+ // so the v4-side txn-coordinator registration of the
send can race the
+ // commit() if commit fires before the chain drains.
We collect each
+ // per-txn send future here and await them all before
committing — this is
+ // also the semantically-correct ordering (commit only
after sends land).
+
java.util.List<java.util.concurrent.CompletableFuture<?>> pendingSends =
+ new java.util.ArrayList<>();
for (Producer<byte[]> producer : producers) {
+ AsyncProducer<byte[]> asyncProducer =
producer.async();
for (int j = 0; j <
this.numMessagesProducedPerTransaction; j++) {
long sendTime = System.nanoTime();
+ var msg =
asyncProducer.newMessage().value(payloadBytes);
if (!this.isDisableTransaction) {
-
producer.newMessage(transaction).value(payloadBytes)
- .sendAsync().thenRun(() -> {
- long latencyMicros =
NANOSECONDS.toMicros(
- System.nanoTime() -
sendTime);
-
messageSendRecorder.recordValue(latencyMicros);
-
messageSendRCumulativeRecorder.recordValue(latencyMicros);
-
numMessagesSendSuccess.increment();
- }).exceptionally(exception -> {
- if
(PerfClientUtils.hasInterruptedException(exception)) {
-
Thread.currentThread().interrupt();
- return null;
- }
- // Ignore the exception when
the producer is closed
- if (exception.getCause()
- instanceof
PulsarClientException.AlreadyClosedException) {
- return null;
- }
- log.error()
- .exception(exception)
- .log("Send transaction
message failed with exception");
-
numMessagesSendFailed.increment();
- return null;
- });
- } else {
- producer.newMessage().value(payloadBytes)
- .sendAsync().thenRun(() -> {
- long latencyMicros =
NANOSECONDS.toMicros(
- System.nanoTime() -
sendTime);
-
messageSendRecorder.recordValue(latencyMicros);
-
messageSendRCumulativeRecorder.recordValue(latencyMicros);
-
numMessagesSendSuccess.increment();
- }).exceptionally(exception -> {
- if
(PerfClientUtils.hasInterruptedException(exception)) {
-
Thread.currentThread().interrupt();
- return null;
- }
- // Ignore the exception when
the producer is closed
- if (exception.getCause()
- instanceof
PulsarClientException.AlreadyClosedException) {
- return null;
- }
- log.error()
- .exception(exception)
- .log("Send message
failed with exception");
-
numMessagesSendFailed.increment();
- return null;
- });
+ msg.transaction(transaction);
}
+ pendingSends.add(msg.send().whenComplete((id,
ex) -> {
+ if (ex == null) {
+ long latencyMicros =
NANOSECONDS.toMicros(
+ System.nanoTime() - sendTime);
+
messageSendRecorder.recordValue(latencyMicros);
+
messageSendRCumulativeRecorder.recordValue(latencyMicros);
+ numMessagesSendSuccess.increment();
+ } else {
+ if
(PerfClientUtils.hasInterruptedException(ex)) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ // Ignore the exception when the
producer is closed
+ if (ex.getCause()
+ instanceof
PulsarClientException.AlreadyClosedException) {
+ return;
+ }
+ log.error()
+ .exception(ex)
+ .log("Send message failed with
exception");
+ numMessagesSendFailed.increment();
+ }
+ }));
+ }
+ }
+
+ // Await all pending sends before committing so the
txn-coordinator has
+ // registered every send. allOf().exceptionally()
swallows individual send
+ // failures here — they are already counted in the
whenComplete above.
+ try {
+ java.util.concurrent.CompletableFuture.allOf(
+ pendingSends.toArray(new
java.util.concurrent.CompletableFuture[0]))
+ .exceptionally(t -> null)
+ .join();
+ } catch (Exception awaitEx) {
+ if
(PerfClientUtils.hasInterruptedException(awaitEx)) {
+ Thread.currentThread().interrupt();
}
}
@@ -426,7 +427,7 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
}
if (!this.isDisableTransaction) {
if (!this.isAbortTransaction) {
- transaction.commit()
+ transaction.async().commit()
.thenRun(() -> {
numTxnOpSuccess.increment();
totalNumEndTxnOpSuccess.increment();
@@ -436,35 +437,31 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
return null;
}
log.error()
- .attr("transaction",
transaction.getTxnID().toString())
.exception(exception)
.log("Commit transaction
failed with exception");
totalNumEndTxnOpFailed.increment();
return null;
});
} else {
- transaction.abort().thenRun(() -> {
- numTxnOpSuccess.increment();
- totalNumEndTxnOpSuccess.increment();
- }).exceptionally(exception -> {
- if
(PerfClientUtils.hasInterruptedException(exception)) {
- Thread.currentThread().interrupt();
- return null;
- }
- log.error()
- .attr("transaction",
transaction.getTxnID().toString())
- .exception(exception)
- .log("Commit transaction failed
with exception");
- totalNumEndTxnOpFailed.increment();
- return null;
- });
+ transaction.async().abort()
+ .thenRun(() -> {
+ numTxnOpSuccess.increment();
+
totalNumEndTxnOpSuccess.increment();
+ }).exceptionally(exception -> {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+
Thread.currentThread().interrupt();
+ return null;
+ }
+ log.error()
+ .exception(exception)
+ .log("Abort transaction
failed with exception");
+ totalNumEndTxnOpFailed.increment();
+ return null;
+ });
}
while (!Thread.currentThread().isInterrupted()) {
try {
- Transaction newTransaction =
client.newTransaction()
-
.withTransactionTimeout(this.transactionTimeout, TimeUnit.SECONDS)
- .build()
- .get();
+ Transaction newTransaction =
client.newTransaction();
atomicReference.compareAndSet(transaction,
newTransaction);
totalNumTxnOpenTxnSuccess.increment();
break;
@@ -631,29 +628,29 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
- private List<List<Consumer<byte[]>>> buildConsumer(PulsarClient client)
+ private List<List<QueueConsumer<byte[]>>> buildConsumer(PulsarClient
client)
throws ExecutionException, InterruptedException {
- ConsumerBuilder<byte[]> consumerBuilder =
client.newConsumer(Schema.BYTES)
- .subscriptionType(this.subscriptionType)
- .receiverQueueSize(this.receiverQueueSize)
- .subscriptionInitialPosition(this.subscriptionInitialPosition)
- .replicateSubscriptionState(this.replicatedSubscription);
Iterator<String> consumerTopicsIterator =
this.consumerTopic.iterator();
- List<List<Consumer<byte[]>>> consumers = new
ArrayList<>(this.consumerTopic.size());
+ List<List<QueueConsumer<byte[]>>> consumers = new
ArrayList<>(this.consumerTopic.size());
while (consumerTopicsIterator.hasNext()){
String topic = consumerTopicsIterator.next();
- final List<Consumer<byte[]>> subscriptions = new
ArrayList<>(this.numSubscriptions);
- final List<Future<Consumer<byte[]>>> subscriptionFutures =
+ final List<QueueConsumer<byte[]>> subscriptions = new
ArrayList<>(this.numSubscriptions);
+ final List<Future<QueueConsumer<byte[]>>> subscriptionFutures =
new ArrayList<>(this.numSubscriptions);
log.info().attr("topic", topic).log("Create subscriptions for
topic");
for (int j = 0; j < this.numSubscriptions; j++) {
String subscriberName = this.subscriptions.get(j);
- subscriptionFutures
-
.add(consumerBuilder.clone().topic(topic).subscriptionName(subscriberName)
- .subscribeAsync());
+ // V5 QueueConsumerBuilder has no clone(); build fresh per
subscription.
+ QueueConsumerBuilder<byte[]> b =
client.newQueueConsumer(Schema.bytes())
+ .receiverQueueSize(this.receiverQueueSize)
+
.subscriptionInitialPosition(this.subscriptionInitialPosition)
+
.replicateSubscriptionState(this.replicatedSubscription)
+ .topic(topic)
+ .subscriptionName(subscriberName);
+ subscriptionFutures.add(b.subscribeAsync());
}
- for (Future<Consumer<byte[]>> future : subscriptionFutures) {
+ for (Future<QueueConsumer<byte[]>> future : subscriptionFutures) {
subscriptions.add(future.get());
}
consumers.add(subscriptions);
@@ -664,13 +661,13 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
private List<Producer<byte[]>> buildProducers(PulsarClient client)
throws ExecutionException, InterruptedException {
- ProducerBuilder<byte[]> producerBuilder =
client.newProducer(Schema.BYTES)
- .sendTimeout(0, TimeUnit.SECONDS);
-
final List<Future<Producer<byte[]>>> producerFutures = new
ArrayList<>();
for (String topic : this.producerTopic) {
log.info().attr("topic", topic).log("Create producer for topic");
-
producerFutures.add(producerBuilder.clone().topic(topic).createAsync());
+ ProducerBuilder<byte[]> b = client.newProducer(Schema.bytes())
+ .sendTimeout(Duration.ZERO)
+ .topic(topic);
+ producerFutures.add(b.createAsync());
}
final List<Producer<byte[]>> producers = new
ArrayList<>(producerFutures.size());
diff --git
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
index 654b73b70ae..7196c8948c8 100644
---
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
+++
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
@@ -22,19 +22,14 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import lombok.Cleanup;
import lombok.CustomLog;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
@@ -159,27 +154,12 @@ public class PerformanceProducerTest extends
MockedPulsarServiceBaseTest {
newConsumer2.close();
}
- @Test(timeOut = 20000)
- public void testBatchingDisabled() throws Exception {
- PerformanceProducer producer = new PerformanceProducer();
-
- int producerId = 0;
-
- String topic = testTopic + UUID.randomUUID();
- producer.topics = List.of(topic);
- producer.msgRate = 10;
- producer.serviceURL = pulsar.getBrokerServiceUrl();
- producer.numMessages = 500;
- producer.disableBatching = true;
-
- ClientBuilder clientBuilder =
PerfClientUtils.createClientBuilderFromArguments(producer)
- .enableTransaction(producer.isEnableTransaction);
- @Cleanup
- PulsarClient client = clientBuilder.build();
- ProducerBuilderImpl<byte[]> builder = (ProducerBuilderImpl<byte[]>)
producer.createProducerBuilder(client,
- producerId);
- Assert.assertFalse(builder.getConf().isBatchingEnabled());
- }
+ // testBatchingDisabled was a white-box test that cast
createProducerBuilder()'s return to the
+ // v4 ProducerBuilderImpl and inspected its config to assert batching was
off. The V5
+ // ProducerBuilder is intentionally opaque (no public conf accessor), so
this assertion shape
+ // cannot survive the migration. The regression intent —
"disableBatching=true must propagate
+ // to the configured builder" — is covered by V5's
BatchingPolicy.ofDisabled()-equivalent
+ // tests and the end-to-end perf workflow tests in this file.
@Test(timeOut = 20000)
public void testCreatePartitions() throws Exception {