This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 820300f93be [improve][cli] Migrate pulsar-client to the V5 client API 
(#25917)
820300f93be is described below

commit 820300f93bee6c92c0f3bc6cd169a47b02df3768
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jun 3 15:09:35 2026 -0700

    [improve][cli] Migrate pulsar-client to the V5 client API (#25917)
---
 .../api/v5/internal/PulsarClientProvider.java      |   4 +
 .../apache/pulsar/client/api/v5/schema/Schema.java |  26 +++
 .../pulsar/client/api/v5/schema/SchemaInfo.java    |  18 ++
 .../client/api/v5/schema/SchemaInfoRecord.java     |  21 +-
 pulsar-client-tools-test/build.gradle.kts          |   1 +
 .../client/cli/PulsarClientToolForceBatchNum.java  | 212 ++++++++++++++++-----
 .../pulsar/client/cli/PulsarClientToolTest.java    |  37 ++--
 .../pulsar/client/cli/PulsarClientToolWsTest.java  |  14 +-
 pulsar-client-tools/build.gradle.kts               |   2 +
 .../org/apache/pulsar/client/cli/AbstractCmd.java  |  26 +++
 .../pulsar/client/cli/AbstractCmdConsume.java      | 171 ++++-------------
 .../org/apache/pulsar/client/cli/CmdConsume.java   | 177 ++++++++++-------
 .../org/apache/pulsar/client/cli/CmdProduce.java   | 204 +++++++-------------
 .../java/org/apache/pulsar/client/cli/CmdRead.java | 135 ++++++-------
 .../apache/pulsar/client/cli/PulsarClientTool.java | 106 +++++++++--
 .../apache/pulsar/client/cli/TestCmdConsume.java   |  23 +++
 .../apache/pulsar/client/cli/TestCmdProduce.java   |  78 ++++----
 .../org/apache/pulsar/client/cli/TestCmdRead.java  |  23 +--
 pulsar-client-v5/build.gradle.kts                  |   6 +-
 .../client/impl/v5/PulsarClientProviderV5.java     |  12 ++
 .../pulsar/client/impl/v5/SchemaAdapter.java       |  13 ++
 .../pulsar/client/impl/v5/SchemaFactoryTest.java   |  68 +++++++
 22 files changed, 802 insertions(+), 575 deletions(-)

diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
index e27a02302bc..670e3bc374a 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
@@ -78,6 +78,10 @@ public interface PulsarClientProvider {
 
     Schema<byte[]> autoProduceBytesSchema();
 
+    Schema<?> genericSchema(org.apache.pulsar.client.api.v5.schema.SchemaInfo 
schemaInfo);
+
+    Schema<byte[]> autoProduceBytesSchema(Schema<?> base);
+
     // --- Checkpoint ---
 
     Checkpoint checkpointFromBytes(byte[] data) throws IOException;
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
index da087a95b30..173cf0a77ff 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
@@ -204,4 +204,30 @@ public interface Schema<T> {
     static Schema<byte[]> autoProduceBytes() {
         return PulsarClientProvider.get().autoProduceBytesSchema();
     }
+
+    /**
+     * Build a generic schema from a raw {@link SchemaInfo} definition. Use 
this when the schema
+     * is described by a definition document (e.g. an Avro or JSON schema 
string) rather than a
+     * compiled POJO class.
+     *
+     * @param schemaInfo the schema descriptor (type + definition bytes)
+     * @return a generic {@link Schema} for the given definition
+     * @see SchemaInfo#of
+     */
+    static Schema<?> generic(SchemaInfo schemaInfo) {
+        return PulsarClientProvider.get().genericSchema(schemaInfo);
+    }
+
+    /**
+     * Get a schema that produces raw bytes while validating them against the 
supplied
+     * {@code base} schema (in addition to the topic schema). This is the 
wrapping form of
+     * {@link #autoProduceBytes()} — the producer sends already-encoded bytes, 
and the bytes are
+     * checked for compatibility with {@code base} before being published.
+     *
+     * @param base the schema the produced bytes must conform to
+     * @return a {@link Schema} for producing pre-encoded bytes validated 
against {@code base}
+     */
+    static Schema<byte[]> autoProduceBytesOf(Schema<?> base) {
+        return PulsarClientProvider.get().autoProduceBytesSchema(base);
+    }
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfo.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfo.java
index 26b875f91b8..910d30f7eae 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfo.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfo.java
@@ -52,4 +52,22 @@ public interface SchemaInfo {
      * @return an unmodifiable map of schema property key-value pairs
      */
     Map<String, String> properties();
+
+    /**
+     * Build a {@link SchemaInfo} from its components. Useful for constructing 
a generic schema
+     * from a raw definition (e.g. an Avro/JSON schema document) via {@link 
Schema#generic}.
+     *
+     * @param name       the schema name
+     * @param type       the schema type
+     * @param schema     the raw schema definition bytes (e.g. Avro schema 
JSON); may be {@code null}
+     * @param properties additional schema properties; may be {@code null}
+     * @return an immutable {@link SchemaInfo}
+     */
+    static SchemaInfo of(String name, SchemaType type, byte[] schema, 
Map<String, String> properties) {
+        return new SchemaInfoRecord(
+                name,
+                type,
+                schema == null ? new byte[0] : schema.clone(),
+                properties == null ? Map.of() : Map.copyOf(properties));
+    }
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfoRecord.java
similarity index 72%
copy from 
pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
copy to 
pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfoRecord.java
index 10b68648ebb..787540b20c3 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfoRecord.java
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.cli;
+package org.apache.pulsar.client.api.v5.schema;
 
-import java.util.concurrent.Callable;
+import java.util.Map;
 
-public abstract class AbstractCmd implements Callable<Integer> {
-    // Picocli entrypoint.
-    @Override
-    public Integer call() throws Exception {
-        return run();
-    }
-
-    abstract int run() throws Exception;
+/**
+ * Immutable {@link SchemaInfo} value holder backing {@link SchemaInfo#of}.
+ */
+record SchemaInfoRecord(
+        String name,
+        SchemaType type,
+        byte[] schema,
+        Map<String, String> properties
+) implements SchemaInfo {
 }
diff --git a/pulsar-client-tools-test/build.gradle.kts 
b/pulsar-client-tools-test/build.gradle.kts
index ce9e644b41d..87365636aef 100644
--- a/pulsar-client-tools-test/build.gradle.kts
+++ b/pulsar-client-tools-test/build.gradle.kts
@@ -35,6 +35,7 @@ dependencies {
     testImplementation(libs.guava)
     testImplementation(project(":pulsar-client-admin-original"))
     testImplementation(project(":pulsar-client-original"))
+    testImplementation(project(":pulsar-client-api-v5"))
     testImplementation(project(":pulsar-functions:pulsar-functions-api"))
     testImplementation(libs.picocli)
 }
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
index c3de66abc5c..5900b4cab8e 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
@@ -22,32 +22,43 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.mockito.stubbing.Answer;
+import org.apache.pulsar.client.api.v5.MessageBuilder;
+import org.apache.pulsar.client.api.v5.MessageId;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.ProducerBuilder;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.async.AsyncMessageBuilder;
+import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
+import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.testng.Assert;
 
 /**
- * An implement of {@link PulsarClientTool} for test, which will publish 
messages iff there is enough messages
- * in the batch.
+ * An implementation of {@link PulsarClientTool} for test, which publishes 
messages only once there
+ * are enough messages in a batch. The producer is forced to batch exactly 
{@code batchNum} messages
+ * with an effectively-infinite publish delay, and the synchronous {@code 
send()} is turned into an
+ * asynchronous send so the messages actually accumulate into a batch (a 
blocking send would flush
+ * each message individually).
  */
-public class PulsarClientToolForceBatchNum extends PulsarClientTool{
+public class PulsarClientToolForceBatchNum extends PulsarClientTool {
     private final String topic;
     private final int batchNum;
 
     /**
-     *
      * @param properties properties
      * @param topic topic
-     * @param batchNum iff there is batchNum messages in the batch, the 
producer will flush and send.
+     * @param batchNum iff there are batchNum messages in the batch, the 
producer will flush and send.
      */
     public PulsarClientToolForceBatchNum(Properties properties, String topic, 
int batchNum) {
         super(properties);
@@ -55,46 +66,147 @@ public class PulsarClientToolForceBatchNum extends 
PulsarClientTool{
         this.batchNum = batchNum;
         produceCommand = new CmdProduce() {
             @Override
-            public void updateConfig(ClientBuilder newBuilder, Authentication 
authentication, String serviceURL) {
-                try {
-                    super.updateConfig(mockClientBuilder(newBuilder), 
authentication, serviceURL);
-                } catch (Exception e) {
-                    Assert.fail("update config fail " + e.getMessage());
-                }
+            public void updateConfig(PulsarClientBuilder newBuilder, 
Authentication authentication,
+                                     String serviceURL) {
+                super.updateConfig(mockClientBuilder(newBuilder), 
authentication, serviceURL);
             }
         };
         replaceProducerCommand(produceCommand);
     }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    private ClientBuilder mockClientBuilder(ClientBuilder newBuilder) throws 
Exception {
-        PulsarClientImpl client = (PulsarClientImpl) newBuilder.build();
-        ProducerBuilder<byte[]> producerBuilder = client.newProducer()
-            .batchingMaxBytes(Integer.MAX_VALUE)
-            .batchingMaxMessages(batchNum)
-            .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
-            .topic(topic);
-        Producer<byte[]> producer = producerBuilder.create();
-
-        PulsarClientImpl mockClient = spy(client);
-        ProducerBuilder<byte[]> mockProducerBuilder = spy(producerBuilder);
-        Producer<byte[]> mockProducer = spy(producer);
-        ClientBuilder mockClientBuilder = spy(newBuilder);
-
-        doAnswer((Answer<TypedMessageBuilder>) invocation -> {
-            TypedMessageBuilder typedMessageBuilder = 
spy((TypedMessageBuilder) invocation.callRealMethod());
-            doAnswer((Answer<MessageId>) invocation1 -> {
-                TypedMessageBuilder mock = ((TypedMessageBuilder) 
invocation1.getMock());
-                // using sendAsync() to replace send()
-                mock.sendAsync();
-                return null;
-            }).when(typedMessageBuilder).send();
-            return typedMessageBuilder;
-        }).when(mockProducer).newMessage();
-
-        doReturn(mockProducer).when(mockProducerBuilder).create();
-        
doReturn(mockProducerBuilder).when(mockClient).newProducer(any(Schema.class));
-        doReturn(mockClient).when(mockClientBuilder).build();
-        return mockClientBuilder;
+    private PulsarClientBuilder mockClientBuilder(PulsarClientBuilder 
newBuilder) {
+        try {
+            PulsarClient realClient = newBuilder.build();
+            PulsarClient spyClient = spy(realClient);
+
+            doAnswer(invocation -> {
+                @SuppressWarnings("unchecked")
+                Schema<byte[]> schema = (Schema<byte[]>) 
invocation.getArgument(0);
+                // Build a producer that batches exactly batchNum messages and 
(practically) never
+                // flushes on the timer, so batching is deterministic. 
CmdProduce will still call
+                // topic()/batchingPolicy() on the returned spy; for the 
batched case it leaves the
+                // default batching alone, so this forced policy stands.
+                ProducerBuilder<byte[]> realBuilder = 
realClient.newProducer(schema)
+                        .topic(topic)
+                        .batchingPolicy(BatchingPolicy.builder()
+                                .enabled(true)
+                                .maxMessages(batchNum)
+                                .maxPublishDelay(Duration.ofDays(1))
+                                .maxSize(MemorySize.ofBytes(Integer.MAX_VALUE))
+                                .build());
+                ProducerBuilder<byte[]> spyBuilder = spy(realBuilder);
+                doAnswer(c -> 
forceAsyncSend(realBuilder.create())).when(spyBuilder).create();
+                return spyBuilder;
+            }).when(spyClient).newProducer(any(Schema.class));
+
+            PulsarClientBuilder spyBuilder = spy(newBuilder);
+            doReturn(spyClient).when(spyBuilder).build();
+            return spyBuilder;
+        } catch (Exception e) {
+            Assert.fail("update config fail " + e.getMessage());
+            return newBuilder;
+        }
+    }
+
+    /**
+     * Wrap a producer so that the synchronous {@code newMessage().send()} the 
CLI uses is dispatched
+     * asynchronously, letting messages accumulate into a batch instead of 
flushing one-by-one. The
+     * send futures are collected and awaited on {@code close()} so no batched 
message is lost (the
+     * CLI ignores the per-send result, and the V5 producer does not 
implicitly await fire-and-forget
+     * sends issued through a different builder instance).
+     */
+    private static Producer<byte[]> forceAsyncSend(Producer<byte[]> 
realProducer) throws Exception {
+        List<CompletableFuture<MessageId>> pending = 
Collections.synchronizedList(new ArrayList<>());
+        Producer<byte[]> spyProducer = spy(realProducer);
+        doAnswer(inv -> new 
AsyncForwardingMessageBuilder(realProducer.async().newMessage(), pending))
+                .when(spyProducer).newMessage();
+        doAnswer(inv -> {
+            CompletableFuture.allOf(pending.toArray(new 
CompletableFuture[0])).join();
+            realProducer.close();
+            return null;
+        }).when(spyProducer).close();
+        return spyProducer;
+    }
+
+    /**
+     * A {@link MessageBuilder} that accumulates metadata onto an {@link 
AsyncMessageBuilder} and
+     * fires the send asynchronously, returning {@code null} (the CLI ignores 
the send result). The
+     * pending send future is recorded so the producer can await it on close.
+     */
+    private static final class AsyncForwardingMessageBuilder implements 
MessageBuilder<byte[]> {
+        private final AsyncMessageBuilder<byte[]> delegate;
+        private final List<CompletableFuture<MessageId>> pending;
+
+        AsyncForwardingMessageBuilder(AsyncMessageBuilder<byte[]> delegate,
+                                      List<CompletableFuture<MessageId>> 
pending) {
+            this.delegate = delegate;
+            this.pending = pending;
+        }
+
+        @Override
+        public MessageId send() {
+            pending.add(delegate.send());
+            return null;
+        }
+
+        @Override
+        public MessageBuilder<byte[]> value(byte[] value) {
+            delegate.value(value);
+            return this;
+        }
+
+        @Override
+        public MessageBuilder<byte[]> key(String key) {
+            delegate.key(key);
+            return this;
+        }
+
+        @Override
+        public MessageBuilder<byte[]> transaction(Transaction txn) {
+            delegate.transaction(txn);
+            return this;
+        }
+
+        @Override
+        public MessageBuilder<byte[]> property(String name, String value) {
+            delegate.property(name, value);
+            return this;
+        }
+
+        @Override
+        public MessageBuilder<byte[]> properties(Map<String, String> 
properties) {
+            delegate.properties(properties);
+            return this;
+        }
+
+        @Override
+        public MessageBuilder<byte[]> eventTime(Instant eventTime) {
+            delegate.eventTime(eventTime);
+            return this;
+        }
+
+        @Override
+        public MessageBuilder<byte[]> sequenceId(long sequenceId) {
+            delegate.sequenceId(sequenceId);
+            return this;
+        }
+
+        @Override
+        public MessageBuilder<byte[]> deliverAfter(Duration delay) {
+            delegate.deliverAfter(delay);
+            return this;
+        }
+
+        @Override
+        public MessageBuilder<byte[]> deliverAt(Instant timestamp) {
+            delegate.deliverAt(timestamp);
+            return this;
+        }
+
+        @Override
+        public MessageBuilder<byte[]> replicationClusters(List<String> 
clusters) {
+            delegate.replicationClusters(clusters);
+            return this;
+        }
     }
 }
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 27939e783ad..9db7f9596f9 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -71,7 +71,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
     public void testInitialization() throws InterruptedException, 
ExecutionException, PulsarAdminException {
 
         Properties properties = new Properties();
-        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
         properties.setProperty("useTls", "false");
         properties.setProperty("memoryLimit", "10M");
 
@@ -120,7 +120,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
     public void testNonDurableSubscribe() throws Exception {
 
         Properties properties = new Properties();
-        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
         properties.setProperty("useTls", "false");
 
         final String topicName = getTopicWithRandomSuffix("non-durable");
@@ -160,17 +160,17 @@ public class PulsarClientToolTest extends BrokerTestBase {
         Assert.assertFalse(future.isCompletedExceptionally());
         future.get();
 
-        Awaitility.await()
-                .ignoreExceptions()
-                .atMost(Duration.ofMillis(20000))
-                .until(()->admin.topics().getSubscriptions(topicName).size() 
== 0);
+        // The V5-based pulsar-client has no non-durable subscription mode: 
--subscription-mode
+        // NonDurable falls back to a durable subscription (with a warning). 
So unlike the v4
+        // client, the subscription is NOT removed when the consumer 
disconnects — it persists.
+        assertEquals(admin.topics().getSubscriptions(topicName).size(), 1);
     }
 
     @Test(timeOut = 60000)
     public void testDurableSubscribe() throws Exception {
 
         Properties properties = new Properties();
-        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
         properties.setProperty("useTls", "false");
 
         final String topicName = getTopicWithRandomSuffix("durable");
@@ -212,7 +212,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
     @Test(timeOut = 20000)
     public void testRead() throws Exception {
         Properties properties = new Properties();
-        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
         properties.setProperty("useTls", "false");
 
         final String topicName = getTopicWithRandomSuffix("reader");
@@ -262,7 +262,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
     @Test(timeOut = 20000)
     public void testEncryption() throws Exception {
         Properties properties = new Properties();
-        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
         properties.setProperty("useTls", "false");
 
         final String topicName = getTopicWithRandomSuffix("encryption");
@@ -303,10 +303,13 @@ public class PulsarClientToolTest extends BrokerTestBase {
         }
     }
 
-    @Test(timeOut = 20000)
+    // Longer timeout than the other cases: this test forces an immediate 
burst of async sends
+    // through the V5 producer right after create(), which can race the 
scalable-topic segment
+    // layout becoming active and retry with exponential backoff before the 
first batch lands.
+    @Test(timeOut = 60000)
     public void testDisableBatching() throws Exception {
         Properties properties = new Properties();
-        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
         properties.setProperty("useTls", "false");
 
         final String topicName = getTopicWithRandomSuffix("disable-batching");
@@ -414,7 +417,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
     @Test
     public void testSendMultipleMessage() throws Exception {
         Properties properties = new Properties();
-        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
         properties.setProperty("useTls", "false");
 
         final String topicName = getTopicWithRandomSuffix("test-multiple-msg");
@@ -475,7 +478,9 @@ public class PulsarClientToolTest extends BrokerTestBase {
 
     }
 
-    @Test
+    // KeyValue schema production is not yet supported by the V5-based 
pulsar-client (CmdProduce
+    // rejects --key-value-encoding-type with a clear message); deferred to a 
follow-up.
+    @Test(enabled = false)
     public void testProduceKeyValueSchemaInlineValue() throws Exception {
 
         Properties properties = initializeToolProperties();
@@ -525,7 +530,9 @@ public class PulsarClientToolTest extends BrokerTestBase {
         };
     }
 
-    @Test(dataProvider = "keyValueKeySchema")
+    // KeyValue schema production is not yet supported by the V5-based 
pulsar-client (CmdProduce
+    // rejects --key-value-encoding-type with a clear message); deferred to a 
follow-up.
+    @Test(dataProvider = "keyValueKeySchema", enabled = false)
     public void testProduceKeyValueSchemaFileValue(String schema) throws 
Exception {
 
         Properties properties = initializeToolProperties();
@@ -582,7 +589,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
 
     private Properties initializeToolProperties() {
         Properties properties = new Properties();
-        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
         properties.setProperty("useTls", "false");
         return properties;
     }
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
index 60a37fe4c3b..44035265867 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
@@ -50,7 +50,7 @@ public class PulsarClientToolWsTest extends BrokerTestBase {
     @Test(timeOut = 30000)
     public void testWebSocketNonDurableSubscriptionMode() throws Exception {
         Properties properties = new Properties();
-        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
         properties.setProperty("useTls", "false");
 
         final String topicName = "persistent://my-property/my-ns/topic-" + 
UUID.randomUUID();
@@ -88,16 +88,16 @@ public class PulsarClientToolWsTest extends BrokerTestBase {
             Assert.assertFalse(future.isCompletedExceptionally());
         }
 
-        Awaitility.await()
-                .ignoreExceptions().untilAsserted(() -> {
-            
Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(), 0);
-        });
+        // The V5-based pulsar-client has no non-durable subscription mode: 
--subscription-mode
+        // NonDurable falls back to a durable subscription, so it persists 
after the consumer
+        // disconnects rather than being removed.
+        Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(), 
1);
     }
 
     @Test(timeOut = 30000)
     public void testWebSocketDurableSubscriptionMode() throws Exception {
         Properties properties = new Properties();
-        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
         properties.setProperty("useTls", "false");
 
         final String topicName = "persistent://my-property/my-ns/topic-" + 
UUID.randomUUID();
@@ -145,7 +145,7 @@ public class PulsarClientToolWsTest extends BrokerTestBase {
     @Test(timeOut = 30000)
     public void testWebSocketReader() throws Exception {
         Properties properties = new Properties();
-        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
         properties.setProperty("useTls", "false");
 
         final String topicName = "persistent://my-property/my-ns/topic-" + 
UUID.randomUUID();
diff --git a/pulsar-client-tools/build.gradle.kts 
b/pulsar-client-tools/build.gradle.kts
index 0da8750658c..f2555cb9d14 100644
--- a/pulsar-client-tools/build.gradle.kts
+++ b/pulsar-client-tools/build.gradle.kts
@@ -27,6 +27,8 @@ dependencies {
     implementation(project(":pulsar-client-admin-api"))
     implementation(project(":pulsar-client-admin-original"))
     implementation(project(":pulsar-client-original"))
+    implementation(project(":pulsar-client-api-v5"))
+    implementation(project(":pulsar-client-v5"))
     implementation(project(":pulsar-common"))
     implementation(project(":pulsar-client-messagecrypto-bc"))
     implementation(project(":pulsar-cli-utils"))
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
index 10b68648ebb..e3e4b08b36e 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.cli;
 
+import java.net.URI;
+import java.nio.file.Path;
 import java.util.concurrent.Callable;
 
 public abstract class AbstractCmd implements Callable<Integer> {
@@ -28,4 +30,28 @@ public abstract class AbstractCmd implements 
Callable<Integer> {
     }
 
     abstract int run() throws Exception;
+
+    /**
+     * Resolve a {@code file:} URI (as accepted by the encryption-key flags) 
to a {@link Path}.
+     * Supports both the hierarchical form ({@code file:///abs/path}, where 
{@link URI#getPath()}
+     * is set) and the opaque relative form ({@code file:rel/path}, where the 
path lives in the
+     * scheme-specific part).
+     *
+     * @param fileUri a {@code file:} URI string
+     * @return the resolved {@link Path}
+     * @throws IllegalArgumentException if the URI scheme is not {@code file}
+     */
+    static Path fileUriToPath(String fileUri) {
+        URI uri = URI.create(fileUri);
+        if (!"file".equalsIgnoreCase(uri.getScheme())) {
+            throw new IllegalArgumentException("This version of pulsar-client 
supports only file:// "
+                    + "encryption keys; got '" + fileUri + "'.");
+        }
+        String path = uri.getPath();
+        if (path == null) {
+            // Opaque (relative) file: URI, e.g. file:../certs/key.pem
+            path = uri.getSchemeSpecificPart();
+        }
+        return Path.of(path);
+    }
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
index 68b0abd5961..543c6f99100 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
@@ -18,17 +18,13 @@
  */
 package org.apache.pulsar.client.cli;
 
-import static 
org.apache.pulsar.client.internal.PulsarClientImplementationBinding.getBytes;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonPrimitive;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.nio.file.Files;
 import java.util.Arrays;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -36,14 +32,12 @@ import java.util.concurrent.TimeUnit;
 import lombok.CustomLog;
 import org.apache.commons.io.HexDump;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.schema.Field;
-import org.apache.pulsar.client.api.schema.GenericObject;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.EncryptionKey;
+import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.eclipse.jetty.websocket.api.Callback;
 import org.eclipse.jetty.websocket.api.Session;
@@ -63,7 +57,7 @@ public abstract class AbstractCmdConsume extends AbstractCmd {
     protected static final Logger LOG = 
LoggerFactory.getLogger(PulsarClientTool.class);
     protected static final String MESSAGE_BOUNDARY = "----- got message -----";
 
-    protected ClientBuilder clientBuilder;
+    protected PulsarClientBuilder clientBuilder;
     protected Authentication authentication;
     protected String serviceURL;
 
@@ -75,7 +69,7 @@ public abstract class AbstractCmdConsume extends AbstractCmd {
      * Set client configuration.
      *
      */
-    public void updateConfig(ClientBuilder clientBuilder, Authentication 
authentication, String serviceURL) {
+    public void updateConfig(PulsarClientBuilder clientBuilder, Authentication 
authentication, String serviceURL) {
         this.clientBuilder = clientBuilder;
         this.authentication = authentication;
         this.serviceURL = serviceURL;
@@ -90,145 +84,64 @@ public abstract class AbstractCmdConsume extends 
AbstractCmd {
      *            Whether to display BytesMessages in hexdump style, ignored 
for simple text messages
      * @return String representation of the message
      */
-    protected String interpretMessage(Message<?> message, boolean displayHex, 
boolean printMetadata)
+    protected String interpretMessage(Message<byte[]> message, boolean 
displayHex, boolean printMetadata)
             throws IOException {
         StringBuilder sb = new StringBuilder();
 
-        String properties = 
Arrays.toString(message.getProperties().entrySet().toArray());
+        String properties = 
Arrays.toString(message.properties().entrySet().toArray());
 
-        String data;
-        Object value = message.getValue();
-        if (value == null) {
-            data = "null";
-        } else if (value instanceof byte[]) {
-            byte[] msgData = (byte[]) value;
-            data = interpretByteArray(displayHex, msgData);
-        } else if (value instanceof GenericObject) {
-            Map<String, Object> asMap = genericObjectToMap((GenericObject) 
value, displayHex);
-            data = asMap.toString();
-        } else if (value instanceof ByteBuffer) {
-            data = new String(getBytes((ByteBuffer) value));
-        } else {
-            data = value.toString();
-        }
-
-        sb.append("publishTime:[").append(message.getPublishTime()).append("], 
");
-        sb.append("eventTime:[").append(message.getEventTime()).append("], ");
+        byte[] value = message.value();
+        String data = value == null ? "null" : interpretByteArray(displayHex, 
value);
 
-        String key = null;
-        if (message.hasKey()) {
-            key = message.getKey();
-        }
-
-        sb.append("key:[").append(key).append("], ");
+        sb.append("publishTime:[").append(message.publishTime()).append("], ");
+        
sb.append("eventTime:[").append(message.eventTime().orElse(null)).append("], ");
+        sb.append("key:[").append(message.key().orElse(null)).append("], ");
         if (!properties.isEmpty()) {
             sb.append("properties:").append(properties).append(", ");
         }
         sb.append("content:").append(data);
 
         if (printMetadata) {
-            if (message.getEncryptionCtx().isPresent()) {
-                EncryptionContext encContext = 
message.getEncryptionCtx().get();
-                if (encContext.getKeys() != null && 
!encContext.getKeys().isEmpty()) {
-                    sb.append(", ");
-                    sb.append("encryption-keys:").append(", ");
-                    encContext.getKeys().forEach((keyName, keyInfo) -> {
-                        String metadata = 
Arrays.toString(keyInfo.getMetadata().entrySet().toArray());
-                        sb.append("name:").append(keyName).append(", 
").append("key-value:")
-                                
.append(Base64.getEncoder().encodeToString(keyInfo.getKeyValue())).append(", ")
-                                
.append("metadata:").append(metadata).append(", ");
-
-                    });
-                    sb.append(", 
").append("param:").append(Base64.getEncoder().encodeToString(encContext.getParam()))
-                            .append(", 
").append("algorithm:").append(encContext.getAlgorithm()).append(", ")
-                            
.append("compression-type:").append(encContext.getCompressionType()).append(", 
")
-                            
.append("uncompressed-size").append(encContext.getUncompressedMessageSize()).append(",
 ")
-                            .append("batch-size")
-                            .append(encContext.getBatchSize().isPresent() ? 
encContext.getBatchSize().get() : 1);
-                }
-            }
-            if (message.hasBrokerPublishTime()) {
-                sb.append(", 
").append("publish-time:").append(DateFormatter.format(message.getPublishTime()));
-            }
-            sb.append(", 
").append("event-time:").append(DateFormatter.format(message.getEventTime()));
-            sb.append(", 
").append("message-id:").append(message.getMessageId());
-            sb.append(", 
").append("producer-name:").append(message.getProducerName());
-            sb.append(", 
").append("sequence-id:").append(message.getSequenceId());
-            sb.append(", 
").append("replicated-from:").append(message.getReplicatedFrom());
-            sb.append(", 
").append("redelivery-count:").append(message.getRedeliveryCount());
-            sb.append(", ").append("ordering-key:")
-                    .append(message.getOrderingKey() != null ? new 
String(message.getOrderingKey()) : "");
-            sb.append(", ").append("schema-version:")
-                    .append(message.getSchemaVersion() != null ? new 
String(message.getSchemaVersion()) : "");
-            if (message.hasIndex()) {
-                sb.append(", ").append("index:").append(message.getIndex());
-            }
+            sb.append(", ").append("message-id:").append(message.id());
+            sb.append(", 
").append("producer-name:").append(message.producerName().orElse(null));
+            sb.append(", 
").append("sequence-id:").append(message.sequenceId());
+            sb.append(", 
").append("replicated-from:").append(message.replicatedFrom().orElse(null));
+            sb.append(", 
").append("redelivery-count:").append(message.redeliveryCount());
         }
 
         return sb.toString();
     }
 
     protected static String interpretByteArray(boolean displayHex, byte[] 
msgData) throws IOException {
-        String data;
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
         if (!displayHex) {
             return new String(msgData);
         } else {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
             HexDump.dump(msgData, 0, out, 0);
             return out.toString();
         }
     }
 
-    protected static Map<String, Object> genericObjectToMap(GenericObject 
value, boolean displayHex)
-            throws IOException {
-        switch (value.getSchemaType()) {
-            case AVRO:
-            case JSON:
-            case PROTOBUF_NATIVE:
-                    return genericRecordToMap((GenericRecord) value, 
displayHex);
-            case KEY_VALUE:
-                    return keyValueToMap((KeyValue<?, ?>) 
value.getNativeObject(), displayHex);
-            default:
-                return primitiveValueToMap(value.getNativeObject(), 
displayHex);
-        }
-    }
-
-    protected static Map<String, Object> keyValueToMap(KeyValue<?, ?> value, 
boolean displayHex) throws IOException {
-        if (value == null) {
-            return Map.of("value", "NULL");
-        }
-        return Map.of("key", primitiveValueToMap(value.getKey(), displayHex),
-                "value", primitiveValueToMap(value.getValue(), displayHex));
-    }
-
-    protected static Map<String, Object> primitiveValueToMap(Object value, 
boolean displayHex) throws IOException {
-        if (value == null) {
-            return Map.of("value", "NULL");
-        }
-        if (value instanceof GenericObject) {
-            return genericObjectToMap((GenericObject) value, displayHex);
-        }
-        if (value instanceof byte[]) {
-            value = interpretByteArray(displayHex, (byte[]) value);
-        }
-        return Map.of("value", value.toString(), "type", value.getClass());
-    }
-
-    protected static Map<String, Object> genericRecordToMap(GenericRecord 
value, boolean displayHex)
-            throws IOException {
-        Map<String, Object> res = new HashMap<>();
-        for (Field f : value.getFields()) {
-            Object fieldValue = value.getField(f);
-            if (fieldValue instanceof GenericRecord) {
-                fieldValue = genericRecordToMap((GenericRecord) fieldValue, 
displayHex);
-            } else if (fieldValue == null) {
-                fieldValue =  "NULL";
-            } else if (fieldValue instanceof byte[]) {
-                fieldValue = interpretByteArray(displayHex, (byte[]) 
fieldValue);
-            }
-            res.put(f.getName(), fieldValue);
-        }
-        return res;
+    /**
+     * Build a consumer-side decryption policy from a {@code file://} key URI, 
mirroring the v4
+     * {@code defaultCryptoKeyReader(uri)} semantics: the private key is 
loaded once and returned
+     * for any key name. (The producer's logical key name travels in the 
message metadata, so a
+     * name-keyed provider would not resolve it; the CLI's file-based flow has 
a single key.)
+     */
+    protected static ConsumerEncryptionPolicy buildFileDecryptionPolicy(
+            String keyUri, ConsumerCryptoFailureAction failureAction) {
+        final byte[] keyBytes;
+        try {
+            keyBytes = Files.readAllBytes(fileUriToPath(keyUri));
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Failed to read decryption key 
from " + keyUri, e);
+        }
+        PrivateKeyProvider provider = (keyName, metadata) ->
+                CompletableFuture.completedFuture(EncryptionKey.of(keyBytes));
+        return ConsumerEncryptionPolicy.builder()
+                .privateKeyProvider(provider)
+                .failureAction(failureAction)
+                .build();
     }
 
     @WebSocket
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 4a5254cd9c0..4c7164d7db3 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -23,21 +23,20 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 import java.io.IOException;
 import java.net.URI;
+import java.time.Duration;
 import java.util.Base64;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -56,6 +55,27 @@ import picocli.CommandLine.Spec;
 @Command(description = "Consume messages from a specified topic")
 public class CmdConsume extends AbstractCmdConsume {
 
+    /**
+     * v4-compatible subscription-type flag. This version of pulsar-client 
consumes through a V5
+     * {@link QueueConsumer} for all types, since it is the only consumer that 
works against both
+     * regular and scalable topics (the ordered StreamConsumer requires a 
scalable-topic
+     * controller). Exclusive / Failover therefore get work-queue 
(Shared-style) semantics and log
+     * a warning rather than preserving single-reader ordering.
+     */
+    public enum SubscriptionType {
+        Exclusive,
+        Shared,
+        Failover,
+        Key_Shared
+    }
+
+    /** v4-compatible subscription-mode flag. Only honored by the WebSocket 
path; the V5 binary
+     *  consumer is always durable, so NonDurable logs a warning. */
+    public enum SubscriptionMode {
+        Durable,
+        NonDurable
+    }
+
     @Parameters(description = "TopicName", arity = "1")
     private String topic;
 
@@ -66,7 +86,7 @@ public class CmdConsume extends AbstractCmdConsume {
     private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;
 
     @Option(names = { "-p", "--subscription-position" }, description = 
"Subscription position.")
-    private SubscriptionInitialPosition subscriptionInitialPosition = 
SubscriptionInitialPosition.Latest;
+    private SubscriptionInitialPosition subscriptionInitialPosition = 
SubscriptionInitialPosition.LATEST;
 
     @Option(names = { "-s", "--subscription-name" }, required = true, 
description = "Subscription name.")
     private String subscriptionName;
@@ -119,9 +139,6 @@ public class CmdConsume extends AbstractCmdConsume {
     @Option(names = { "-mp", "--print-metadata" }, description = "Message 
metadata")
     private boolean printMetadata = false;
 
-    @Option(names = { "-stp", "--start-timestamp" }, description = "Start 
timestamp for consuming messages")
-    private long startTimestamp = 0L;
-
     @Option(names = { "-etp", "--end-timestamp" }, description = "End 
timestamp for consuming messages")
     private long endTimestamp = Long.MAX_VALUE;
 
@@ -146,18 +163,10 @@ public class CmdConsume extends AbstractCmdConsume {
             throw new CommandLine.ParameterException(commandSpec.commandLine(),
                     "Number of messages should be zero or positive.");
         }
-        if (this.startTimestamp < 0) {
-            throw new CommandLine.ParameterException(commandSpec.commandLine(),
-                    "start timestamp should be positive.");
-        }
         if (this.endTimestamp < 0) {
             throw new CommandLine.ParameterException(commandSpec.commandLine(),
                     "end timestamp should be positive.");
         }
-        if (this.endTimestamp < startTimestamp) {
-            throw new CommandLine.ParameterException(commandSpec.commandLine(),
-                    "end timestamp should larger than start timestamp.");
-        }
 
         if (this.serviceURL.startsWith("ws")) {
             return consumeFromWebSocket(topic);
@@ -170,71 +179,67 @@ public class CmdConsume extends AbstractCmdConsume {
         int numMessagesConsumed = 0;
         int returnCode = 0;
 
+        if ("auto_consume".equals(schemaType)) {
+            throw new IllegalArgumentException("schema type 'auto_consume' is 
not supported by this "
+                    + "version of pulsar-client; consume with 'bytes' (the 
default).");
+        } else if (!"bytes".equals(schemaType)) {
+            throw new IllegalArgumentException("schema type must be 'bytes'");
+        }
+        if (!poolMessages) {
+            LOG.info("--pool-messages has no effect on this version of 
pulsar-client.");
+        }
+        if (subscriptionMode == SubscriptionMode.NonDurable) {
+            LOG.warn("--subscription-mode NonDurable is not supported by this 
version of pulsar-client; "
+                    + "a durable subscription is used instead.");
+        }
+        if (subscriptionType == SubscriptionType.Exclusive || subscriptionType 
== SubscriptionType.Failover) {
+            // The V5 StreamConsumer (ordered, single-reader) requires a 
scalable-topic subscription
+            // controller, which regular topics do not have; only the 
QueueConsumer works against
+            // both regular and scalable topics. So all subscription types use 
a QueueConsumer here
+            // and Exclusive/Failover get work-queue (Shared-style) semantics 
rather than ordered.
+            LOG.warn("--subscription-type {} : this version of pulsar-client 
consumes via a work-queue "
+                    + "(Shared-style) subscription; exclusive/failover 
ordering is not preserved.",
+                    subscriptionType);
+        }
+        if (maxPendingChunkedMessage > 0 || 
autoAckOldestChunkedMessageOnQueueFull) {
+            LOG.warn("Chunked-message knobs (--max_chunked_msg / 
--auto_ack_chunk_q_full) have no effect "
+                    + "on this version of pulsar-client.");
+        }
+
         try (PulsarClient client = clientBuilder.build()) {
-            ConsumerBuilder<?> builder;
-            Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
-            if ("auto_consume".equals(schemaType)) {
-                schema = Schema.AUTO_CONSUME();
-            } else if (!"bytes".equals(schemaType)) {
-                throw new IllegalArgumentException("schema type must be 
'bytes' or 'auto_consume'");
-            }
-            builder = client.newConsumer(schema)
+            RateLimiter limiter = (this.consumeRate > 0) ? 
RateLimiter.create(this.consumeRate) : null;
+            QueueConsumerBuilder<byte[]> builder = 
client.newQueueConsumer(Schema.bytes())
                     .subscriptionName(this.subscriptionName)
-                    .subscriptionType(subscriptionType)
-                    .subscriptionMode(subscriptionMode)
                     .subscriptionInitialPosition(subscriptionInitialPosition)
-                    .poolMessages(poolMessages)
                     .replicateSubscriptionState(replicateSubscriptionState);
-
-            if (isRegex) {
-                builder.topicsPattern(Pattern.compile(topic));
-            } else {
-                builder.topic(topic);
-            }
-
-            if (this.maxPendingChunkedMessage > 0) {
-                
builder.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
-            }
             if (this.receiverQueueSize > 0) {
                 builder.receiverQueueSize(this.receiverQueueSize);
             }
-
-            
builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);
-            builder.cryptoFailureAction(cryptoFailureAction);
-
             if (isNotBlank(this.encKeyValue)) {
-                builder.defaultCryptoKeyReader(this.encKeyValue);
+                builder.encryptionPolicy(buildConsumerEncryptionPolicy());
             }
+            applyTopicSelection(builder::topic, builder::namespace);
 
-            try (Consumer<?> consumer = builder.subscribe();) {
-                if (startTimestamp > 0L) {
-                    consumer.seek(startTimestamp);
-                }
-                RateLimiter limiter = (this.consumeRate > 0) ? 
RateLimiter.create(this.consumeRate) : null;
+            try (QueueConsumer<byte[]> consumer = builder.subscribe()) {
                 while (this.numMessagesToConsume == 0 || numMessagesConsumed < 
this.numMessagesToConsume) {
                     if (limiter != null) {
                         limiter.acquire();
                     }
-                    Message<?> msg = consumer.receive(5, TimeUnit.SECONDS);
+                    Message<byte[]> msg = 
consumer.receive(Duration.ofSeconds(5));
                     if (msg == null) {
                         LOG.debug("No message to consume after waiting for 5 
seconds.");
                     } else {
-                        try {
-                            if (msg.getPublishTime() > endTimestamp) {
-                                break;
-                            }
-                            numMessagesConsumed += 1;
-                            if (!hideContent) {
-                                System.out.println(MESSAGE_BOUNDARY);
-                                String output = this.interpretMessage(msg, 
displayHex, printMetadata);
-                                System.out.println(output);
-                            } else if (numMessagesConsumed % 1000 == 0) {
-                                System.out.println("Received " + 
numMessagesConsumed + " messages");
-                            }
-                            consumer.acknowledge(msg);
-                        } finally {
-                            msg.release();
+                        if (msg.publishTime().toEpochMilli() > endTimestamp) {
+                            break;
+                        }
+                        numMessagesConsumed += 1;
+                        if (!hideContent) {
+                            System.out.println(MESSAGE_BOUNDARY);
+                            System.out.println(this.interpretMessage(msg, 
displayHex, printMetadata));
+                        } else if (numMessagesConsumed % 1000 == 0) {
+                            System.out.println("Received " + 
numMessagesConsumed + " messages");
                         }
+                        consumer.acknowledge(msg.id());
                     }
                 }
             }
@@ -247,7 +252,41 @@ public class CmdConsume extends AbstractCmdConsume {
         }
 
         return returnCode;
+    }
+
+    /**
+     * Apply the topic argument to the consumer. A plain topic uses {@code 
topic(...)}; a
+     * {@code --regex} pattern is mapped to a namespace subscription over the 
pattern's
+     * {@code tenant/namespace} (V5 has no topic-regex; namespace 
subscriptions follow the
+     * namespace live).
+     */
+    private void applyTopicSelection(java.util.function.Consumer<String> 
topicFn,
+                                     java.util.function.Consumer<String> 
namespaceFn) {
+        if (isRegex) {
+            namespaceFn.accept(namespaceFromPattern(topic));
+        } else {
+            topicFn.accept(topic);
+        }
+    }
+
+    static String namespaceFromPattern(String pattern) {
+        // Strip an optional persistent:// / non-persistent:// domain prefix, 
then take the first
+        // two path segments as tenant/namespace.
+        String rest = pattern;
+        int scheme = rest.indexOf("://");
+        if (scheme >= 0) {
+            rest = rest.substring(scheme + 3);
+        }
+        String[] parts = rest.split("/");
+        if (parts.length < 2) {
+            throw new IllegalArgumentException("Cannot derive a 
tenant/namespace from --regex pattern '"
+                    + pattern + "'. Use a fully-qualified pattern, e.g. 
persistent://tenant/namespace/.*");
+        }
+        return parts[0] + "/" + parts[1];
+    }
 
+    private ConsumerEncryptionPolicy buildConsumerEncryptionPolicy() {
+        return buildFileDecryptionPolicy(this.encKeyValue, 
cryptoFailureAction);
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index ae910f1913d..8de079f7bfa 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -35,7 +35,6 @@ import java.util.Base64;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -49,19 +48,19 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.JsonDecoder;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.api.schema.KeyValueSchema;
-import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
+import org.apache.pulsar.client.api.v5.MessageBuilder;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.ProducerBuilder;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
+import org.apache.pulsar.client.api.v5.config.ChunkingPolicy;
+import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.api.v5.schema.SchemaInfo;
+import org.apache.pulsar.client.api.v5.schema.SchemaType;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerMessage;
 import org.eclipse.jetty.client.HttpClient;
@@ -89,8 +88,6 @@ import picocli.CommandLine.Spec;
 public class CmdProduce extends AbstractCmd {
     private static final int MAX_MESSAGES = 1000;
     static final String KEY_VALUE_ENCODING_TYPE_NOT_SET = "";
-    private static final String KEY_VALUE_ENCODING_TYPE_SEPARATED = 
"separated";
-    private static final String KEY_VALUE_ENCODING_TYPE_INLINE = "inline";
 
     @Parameters(description = "TopicName", arity = "1")
     private String topic;
@@ -160,7 +157,7 @@ public class CmdProduce extends AbstractCmd {
             "--disable-replication" }, description = "Disable geo-replication 
for messages.")
     private boolean disableReplication = false;
 
-    private ClientBuilder clientBuilder;
+    private PulsarClientBuilder clientBuilder;
     private Authentication authentication;
     private String serviceURL;
 
@@ -171,7 +168,7 @@ public class CmdProduce extends AbstractCmd {
     /**
      * Set Pulsar client configuration.
      */
-    public void updateConfig(ClientBuilder newBuilder, Authentication 
authentication, String serviceURL) {
+    public void updateConfig(PulsarClientBuilder newBuilder, Authentication 
authentication, String serviceURL) {
         this.clientBuilder = newBuilder;
         this.authentication = authentication;
         this.serviceURL = serviceURL;
@@ -186,20 +183,15 @@ public class CmdProduce extends AbstractCmd {
      *
      * @return list of message bodies
      */
-    @SuppressWarnings({"unchecked", "rawtypes"})
     static List<byte[]> generateMessageBodies(List<String> stringMessages, 
List<String> messageFileNames,
-                                              Schema schema) {
+                                              org.apache.avro.Schema 
avroSchema) {
         List<byte[]> messageBodies = new ArrayList<>();
 
         for (String m : stringMessages) {
-            if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
-                // JSON TO AVRO
-                @SuppressWarnings("unchecked")
-                Optional<org.apache.avro.Schema> nativeSchema =
-                        (Optional<org.apache.avro.Schema>) (Optional<?>) 
schema.getNativeSchema();
-                org.apache.avro.Schema avroSchema = nativeSchema.get();
-                byte[] encoded = jsonToAvro(m, avroSchema);
-                messageBodies.add(encoded);
+            if (avroSchema != null) {
+                // JSON TO AVRO — the V5 Schema does not expose the native 
Avro schema, so the
+                // caller passes the parsed Avro definition directly.
+                messageBodies.add(jsonToAvro(m, avroSchema));
             } else {
                 messageBodies.add(m.getBytes());
             }
@@ -267,15 +259,11 @@ public class CmdProduce extends AbstractCmd {
 
         if (keyValueEncodingType == null) {
             keyValueEncodingType = KEY_VALUE_ENCODING_TYPE_NOT_SET;
-        } else {
-            switch (keyValueEncodingType) {
-                case KEY_VALUE_ENCODING_TYPE_SEPARATED:
-                case KEY_VALUE_ENCODING_TYPE_INLINE:
-                    break;
-                default:
-                    throw (new 
IllegalArgumentException("--key-value-encoding-type "
-                            + keyValueEncodingType + " is not valid, only 
'separated' or 'inline'"));
-            }
+        } else if 
(!KEY_VALUE_ENCODING_TYPE_NOT_SET.equals(keyValueEncodingType)) {
+            // KeyValue schemas are not yet supported by the V5-based 
pulsar-client.
+            throw new IllegalArgumentException("KeyValue schemas 
(--key-value-encoding-type) are not "
+                    + "supported by this version of pulsar-client; produce 
with a plain value schema "
+                    + "(-vs bytes|string|avro:<def>|json:<def>) instead.");
         }
 
         int totalMessages = (messages.size() + messageFileNames.size()) * 
numTimesProduce;
@@ -296,24 +284,25 @@ public class CmdProduce extends AbstractCmd {
         int numMessagesSent = 0;
         int returnCode = 0;
 
-        try (PulsarClient client = clientBuilder.build()){
-            Schema<?> schema = buildSchema(this.keySchema, this.valueSchema, 
this.keyValueEncodingType);
-            ProducerBuilder<?> producerBuilder = 
client.newProducer(schema).topic(topic);
+        if (this.disableReplication) {
+            log.warn("--disable-replication has no effect on this version of 
pulsar-client and is ignored.");
+        }
+
+        try (PulsarClient client = clientBuilder.build()) {
+            ValueSchema vs = buildValueSchema(this.valueSchema);
+            ProducerBuilder<byte[]> producerBuilder = 
client.newProducer(vs.schema).topic(topic);
             if (this.chunkingAllowed) {
-                producerBuilder.enableChunking(true);
-                producerBuilder.enableBatching(false);
+                
producerBuilder.chunkingPolicy(ChunkingPolicy.builder().enabled(true).build());
+                producerBuilder.batchingPolicy(BatchingPolicy.ofDisabled());
             } else if (this.disableBatching) {
-                producerBuilder.enableBatching(false);
+                producerBuilder.batchingPolicy(BatchingPolicy.ofDisabled());
             }
             if (isNotBlank(this.encKeyName) && isNotBlank(this.encKeyValue)) {
-                producerBuilder.addEncryptionKey(this.encKeyName);
-                producerBuilder.defaultCryptoKeyReader(this.encKeyValue);
+                
producerBuilder.encryptionPolicy(buildEncryptionPolicy(this.encKeyName, 
this.encKeyValue));
             }
-            try (Producer<?> producer = producerBuilder.create();) {
-                Schema<?> schemaForPayload = schema.getSchemaInfo().getType() 
== SchemaType.KEY_VALUE
-                        ? ((KeyValueSchema) schema).getValueSchema() : schema;
+            try (Producer<byte[]> producer = producerBuilder.create()) {
                 List<byte[]> messageBodies = 
generateMessageBodies(this.messages, this.messageFileNames,
-                        schemaForPayload);
+                        vs.avroNative);
                 RateLimiter limiter = (this.publishRate > 0) ? 
RateLimiter.create(this.publishRate) : null;
 
                 Map<String, String> kvMap = new HashMap<>();
@@ -322,63 +311,21 @@ public class CmdProduce extends AbstractCmd {
                     kvMap.put(kv[0], kv[1]);
                 }
 
-                final byte[] keyValueKeyBytes;
-                if (this.keyValueKey != null) {
-                    if (keyValueEncodingType == 
KEY_VALUE_ENCODING_TYPE_NOT_SET) {
-                        throw new IllegalArgumentException(
-                            "Key value encoding type must be set when using 
--key-value-key");
-                    }
-                    keyValueKeyBytes = 
this.keyValueKey.getBytes(StandardCharsets.UTF_8);
-                } else if (this.keyValueKeyFile != null) {
-                    if (keyValueEncodingType == 
KEY_VALUE_ENCODING_TYPE_NOT_SET) {
-                        throw new IllegalArgumentException(
-                            "Key value encoding type must be set when using 
--key-value-key-file");
-                    }
-                    keyValueKeyBytes = 
Files.readAllBytes(Paths.get(this.keyValueKeyFile));
-                } else if (this.key != null) {
-                    keyValueKeyBytes = 
this.key.getBytes(StandardCharsets.UTF_8);
-                } else {
-                    keyValueKeyBytes = null;
-                }
-
                 for (int i = 0; i < this.numTimesProduce; i++) {
                     for (byte[] content : messageBodies) {
                         if (limiter != null) {
                             limiter.acquire();
                         }
 
-                        @SuppressWarnings("unchecked")
-                        TypedMessageBuilder<Object> message = 
(TypedMessageBuilder<Object>) producer.newMessage();
-
+                        MessageBuilder<byte[]> message = producer.newMessage();
                         if (!kvMap.isEmpty()) {
                             message.properties(kvMap);
                         }
-
-                        switch (keyValueEncodingType) {
-                            case KEY_VALUE_ENCODING_TYPE_NOT_SET:
-                                if (key != null && !key.isEmpty()) {
-                                    message.key(key);
-                                }
-                                message.value(content);
-                                break;
-                            case KEY_VALUE_ENCODING_TYPE_SEPARATED:
-                            case KEY_VALUE_ENCODING_TYPE_INLINE:
-                                KeyValue<byte[], byte[]> kv = new KeyValue<>(
-                                        keyValueKeyBytes,
-                                        content);
-                                message.value(kv);
-                                break;
-                            default:
-                                throw new IllegalStateException();
+                        if (key != null && !key.isEmpty()) {
+                            message.key(key);
                         }
-
-                        if (disableReplication) {
-                            message.disableReplication();
-                        }
-
+                        message.value(content);
                         message.send();
-
-
                         numMessagesSent++;
                     }
                 }
@@ -393,52 +340,43 @@ public class CmdProduce extends AbstractCmd {
         return returnCode;
     }
 
-    static Schema<?> buildSchema(String keySchema, String schema, String 
keyValueEncodingType) {
-        switch (keyValueEncodingType) {
-            case KEY_VALUE_ENCODING_TYPE_NOT_SET:
-                return buildComponentSchema(schema);
-            case KEY_VALUE_ENCODING_TYPE_SEPARATED:
-                return Schema.KeyValue(buildComponentSchema(keySchema), 
buildComponentSchema(schema),
-                        KeyValueEncodingType.SEPARATED);
-            case KEY_VALUE_ENCODING_TYPE_INLINE:
-                return Schema.KeyValue(buildComponentSchema(keySchema), 
buildComponentSchema(schema),
-                        KeyValueEncodingType.INLINE);
-            default:
-                throw new IllegalArgumentException("Invalid 
KeyValueEncodingType "
-                        + keyValueEncodingType + ", only: 'none','separated' 
and 'inline");
-        }
+    /** A V5 producer schema (always {@code byte[]}) plus, for {@code avro:}, 
the parsed Avro
+     *  definition used to convert JSON input into Avro bytes. */
+    record ValueSchema(Schema<byte[]> schema, org.apache.avro.Schema 
avroNative) {
     }
 
-    private static Schema<?> buildComponentSchema(String schema) {
-        Schema<?> base;
-        switch (schema) {
-            case "string":
-                base = Schema.STRING;
-                break;
+    static ValueSchema buildValueSchema(String valueSchema) {
+        switch (valueSchema) {
             case "bytes":
-                // no need for wrappers
-                return Schema.BYTES;
+                return new ValueSchema(Schema.bytes(), null);
+            case "string":
+                return new 
ValueSchema(Schema.autoProduceBytesOf(Schema.string()), null);
             default:
-                if (schema.startsWith("avro:")) {
-                    base = buildGenericSchema(SchemaType.AVRO, 
schema.substring(5));
-                } else if (schema.startsWith("json:")) {
-                    base = buildGenericSchema(SchemaType.JSON, 
schema.substring(5));
-                } else {
-                    throw new IllegalArgumentException("Invalid schema type: " 
+ schema);
+                if (valueSchema.startsWith("avro:")) {
+                    String def = valueSchema.substring(5);
+                    org.apache.avro.Schema avroNative = new 
org.apache.avro.Schema.Parser().parse(def);
+                    Schema<?> generic = Schema.generic(
+                            SchemaInfo.of("client", SchemaType.AVRO,
+                                    def.getBytes(StandardCharsets.UTF_8), 
null));
+                    return new ValueSchema(Schema.autoProduceBytesOf(generic), 
avroNative);
+                } else if (valueSchema.startsWith("json:")) {
+                    String def = valueSchema.substring(5);
+                    Schema<?> generic = Schema.generic(
+                            SchemaInfo.of("client", SchemaType.JSON,
+                                    def.getBytes(StandardCharsets.UTF_8), 
null));
+                    return new ValueSchema(Schema.autoProduceBytesOf(generic), 
null);
                 }
+                throw new IllegalArgumentException("Invalid schema type: " + 
valueSchema);
         }
-        return Schema.AUTO_PRODUCE_BYTES(base);
     }
 
-    private static Schema<?> buildGenericSchema(SchemaType type, String 
definition) {
-        return Schema.generic(SchemaInfoImpl
-                .builder()
-                .schema(definition.getBytes(StandardCharsets.UTF_8))
-                .name("client")
-                .properties(new HashMap<>())
-                .type(type)
-                .build());
-
+    private static ProducerEncryptionPolicy buildEncryptionPolicy(String 
keyName, String keyUri) {
+        return ProducerEncryptionPolicy.builder()
+                
.publicKeyProvider(org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider.builder()
+                        .publicKey(keyName, fileUriToPath(keyUri))
+                        .build())
+                .keyName(keyName)
+                .build();
     }
 
     @VisibleForTesting
@@ -501,7 +439,7 @@ public class CmdProduce extends AbstractCmd {
         }
 
         try {
-            List<byte[]> messageBodies = generateMessageBodies(this.messages, 
this.messageFileNames, Schema.BYTES);
+            List<byte[]> messageBodies = generateMessageBodies(this.messages, 
this.messageFileNames, null);
             RateLimiter limiter = (this.publishRate > 0) ? 
RateLimiter.create(this.publishRate) : null;
             for (int i = 0; i < this.numTimesProduce; i++) {
                 int index = i * 10;
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
index e5421ffa6ac..63ecb4e4e76 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
@@ -23,22 +23,21 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 import java.io.IOException;
 import java.net.URI;
+import java.time.Duration;
 import java.util.Base64;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.api.v5.Checkpoint;
+import org.apache.pulsar.client.api.v5.CheckpointConsumer;
+import org.apache.pulsar.client.api.v5.CheckpointConsumerBuilder;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -54,13 +53,11 @@ import picocli.CommandLine.Parameters;
 @Command(description = "Read messages from a specified topic")
 public class CmdRead extends AbstractCmdConsume {
 
-    private static final Pattern MSG_ID_PATTERN = 
Pattern.compile("^(-?[1-9][0-9]*|0):(-?[1-9][0-9]*|0)$");
-
     @Parameters(description = "TopicName", arity = "1")
     private String topic;
 
     @Option(names = { "-m", "--start-message-id" },
-            description = "Initial reader position, it can be 'latest', 
'earliest' or '<ledgerId>:<entryId>'")
+            description = "Initial reader position, it can be 'latest' or 
'earliest'")
     private String startMessageId = "latest";
 
     @Option(names = { "-i", "--start-message-id-inclusive" },
@@ -106,6 +103,9 @@ public class CmdRead extends AbstractCmdConsume {
     @Option(names = { "-ca", "--crypto-failure-action" }, description = 
"Crypto Failure Action")
     private ConsumerCryptoFailureAction cryptoFailureAction = 
ConsumerCryptoFailureAction.FAIL;
 
+    private static final String START_EARLIEST = "earliest";
+    private static final String START_LATEST = "latest";
+
     @Option(names = { "-mp", "--print-metadata" }, description = "Message 
metadata")
     private boolean printMetadata = false;
 
@@ -123,7 +123,10 @@ public class CmdRead extends AbstractCmdConsume {
         if (this.numMessagesToRead < 0) {
             throw (new IllegalArgumentException("Number of messages should be 
zero or positive."));
         }
-
+        if (!START_LATEST.equals(startMessageId) && 
!START_EARLIEST.equals(startMessageId)) {
+            throw new IllegalArgumentException("--start-message-id must be 
'latest' or 'earliest'; the "
+                    + "'<ledgerId>:<entryId>' form is not supported by this 
version of pulsar-client.");
+        }
 
         if (this.serviceURL.startsWith("ws")) {
             return readFromWebSocket(topic);
@@ -136,59 +139,51 @@ public class CmdRead extends AbstractCmdConsume {
         int numMessagesRead = 0;
         int returnCode = 0;
 
-        try (PulsarClient client = clientBuilder.build()){
-            ReaderBuilder<?> builder;
-
-            Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
-            if ("auto_consume".equals(schemaType)) {
-                schema = Schema.AUTO_CONSUME();
-            } else if (!"bytes".equals(schemaType)) {
-                throw new IllegalArgumentException("schema type must be 
'bytes' or 'auto_consume'");
-            }
-            builder = client.newReader(schema)
-                    .topic(topic)
-                    .startMessageId(parseMessageId(startMessageId))
-                    .poolMessages(poolMessages);
-
-            if (this.startMessageIdInclusive) {
-                builder.startMessageIdInclusive();
-            }
-            if (this.maxPendingChunkedMessage > 0) {
-                
builder.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
-            }
-            if (this.receiverQueueSize > 0) {
-                builder.receiverQueueSize(this.receiverQueueSize);
-            }
+        if ("auto_consume".equals(schemaType)) {
+            throw new IllegalArgumentException("schema type 'auto_consume' is 
not supported by this "
+                    + "version of pulsar-client; read with 'bytes' (the 
default).");
+        } else if (!"bytes".equals(schemaType)) {
+            throw new IllegalArgumentException("schema type must be 'bytes'");
+        }
+        if (!poolMessages) {
+            LOG.info("--pool-messages has no effect on this version of 
pulsar-client.");
+        }
+        if (this.startMessageIdInclusive) {
+            LOG.warn("--start-message-id-inclusive has no effect on this 
version of pulsar-client.");
+        }
+        if (maxPendingChunkedMessage > 0 || 
autoAckOldestChunkedMessageOnQueueFull) {
+            LOG.warn("Chunked-message knobs (--max_chunked_msg / 
--auto_ack_chunk_q_full) have no effect "
+                    + "on this version of pulsar-client.");
+        }
 
-            
builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);
-            builder.cryptoFailureAction(cryptoFailureAction);
+        Checkpoint startPosition = START_EARLIEST.equals(startMessageId)
+                ? Checkpoint.earliest() : Checkpoint.latest();
 
+        try (PulsarClient client = clientBuilder.build()) {
+            CheckpointConsumerBuilder<byte[]> builder = 
client.newCheckpointConsumer(Schema.bytes())
+                    .topic(topic)
+                    .startPosition(startPosition);
             if (isNotBlank(this.encKeyValue)) {
-                builder.defaultCryptoKeyReader(this.encKeyValue);
+                builder.encryptionPolicy(buildConsumerEncryptionPolicy());
             }
 
-            try (Reader<?> reader = builder.create()) {
+            try (CheckpointConsumer<byte[]> reader = builder.create()) {
                 RateLimiter limiter = (this.readRate > 0) ? 
RateLimiter.create(this.readRate) : null;
                 while (this.numMessagesToRead == 0 || numMessagesRead < 
this.numMessagesToRead) {
                     if (limiter != null) {
                         limiter.acquire();
                     }
 
-                    Message<?> msg = reader.readNext(5, TimeUnit.SECONDS);
+                    Message<byte[]> msg = 
reader.receive(Duration.ofSeconds(5));
                     if (msg == null) {
                         LOG.debug("No message to read after waiting for 5 
seconds.");
                     } else {
-                        try {
-                            numMessagesRead += 1;
-                            if (!hideContent) {
-                                System.out.println(MESSAGE_BOUNDARY);
-                                String output = this.interpretMessage(msg, 
displayHex, printMetadata);
-                                System.out.println(output);
-                            } else if (numMessagesRead % 1000 == 0) {
-                                System.out.println("Received " + 
numMessagesRead + " messages");
-                            }
-                        } finally {
-                            msg.release();
+                        numMessagesRead += 1;
+                        if (!hideContent) {
+                            System.out.println(MESSAGE_BOUNDARY);
+                            System.out.println(this.interpretMessage(msg, 
displayHex, printMetadata));
+                        } else if (numMessagesRead % 1000 == 0) {
+                            System.out.println("Received " + numMessagesRead + 
" messages");
                         }
                     }
                 }
@@ -202,7 +197,10 @@ public class CmdRead extends AbstractCmdConsume {
         }
 
         return returnCode;
+    }
 
+    private ConsumerEncryptionPolicy buildConsumerEncryptionPolicy() {
+        return buildFileDecryptionPolicy(this.encKeyValue, 
cryptoFailureAction);
     }
 
     @VisibleForTesting
@@ -214,16 +212,9 @@ public class CmdRead extends AbstractCmdConsume {
         String wsTopic = String.format("%s/%s/%s/%s", topicName.getDomain(), 
topicName.getTenant(),
                 topicName.getNamespacePortion(), topicName.getLocalName());
 
-        String msgIdQueryParam;
-        if ("latest".equals(startMessageId) || 
"earliest".equals(startMessageId)) {
-            msgIdQueryParam = startMessageId;
-        } else {
-            MessageId msgId = parseMessageId(startMessageId);
-            msgIdQueryParam = 
Base64.getEncoder().encodeToString(msgId.toByteArray());
-        }
-
+        // Only 'latest' / 'earliest' are accepted (validated in run()).
         return String.format("%s/ws/v2/reader/%s?messageId=%s", 
serviceURLWithoutTrailingSlash, wsTopic,
-                msgIdQueryParam);
+                startMessageId);
     }
 
     @SuppressWarnings("deprecation")
@@ -311,22 +302,4 @@ public class CmdRead extends AbstractCmdConsume {
         return returnCode;
     }
 
-    @VisibleForTesting
-    static MessageId parseMessageId(String msgIdStr) {
-        MessageId msgId;
-        if ("latest".equals(msgIdStr)) {
-            msgId = MessageId.latest;
-        } else if ("earliest".equals(msgIdStr)) {
-            msgId = MessageId.earliest;
-        } else {
-            Matcher matcher = MSG_ID_PATTERN.matcher(msgIdStr);
-            if (matcher.find()) {
-                msgId = new MessageIdImpl(Long.parseLong(matcher.group(1)), 
Long.parseLong(matcher.group(2)), -1);
-            } else {
-                throw new IllegalArgumentException("Message ID must be 
'latest', 'earliest' or '<ledgerId>:<entryId>'");
-            }
-        }
-        return msgId;
-    }
-
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index 4203d75ba4a..481f8266ed0 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -22,19 +22,19 @@ import static 
org.apache.commons.lang3.StringUtils.isNotBlank;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.FileInputStream;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import org.apache.pulsar.cli.converters.picocli.ByteUnitToLongConverter;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.ProxyProtocol;
-import org.apache.pulsar.client.api.PulsarClient;
 import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
-import org.apache.pulsar.client.api.SizeUnit;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.config.ConnectionPolicy;
+import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.api.v5.config.TlsPolicy;
 import org.apache.pulsar.internal.CommandHook;
 import org.apache.pulsar.internal.CommanderFactory;
 import picocli.CommandLine;
@@ -130,37 +130,72 @@ public class PulsarClientTool implements CommandHook {
         commander.addSubcommand("consume", consumeCommand);
         commander.addSubcommand("read", readCommand);
         commander.addSubcommand("generate_documentation", 
generateDocumentation);
+        enableCaseInsensitiveEnums();
+    }
+
+    /**
+     * Accept enum flag values regardless of case across the root command and 
all subcommands. The
+     * V5 client enums are uppercase (LATEST, EARLIEST, FAIL, ...) while users 
have long passed the
+     * mixed-case v4 spellings (Latest, Earliest, ...); case-insensitive 
parsing keeps that flag UX
+     * working. Picocli's {@code setCaseInsensitiveEnumValuesAllowed} does not 
propagate to
+     * subcommands, so it must be applied to each command explicitly.
+     */
+    private void enableCaseInsensitiveEnums() {
+        applyCaseInsensitiveEnums(commander);
+    }
+
+    private static void applyCaseInsensitiveEnums(CommandLine cmd) {
+        cmd.setCaseInsensitiveEnumValuesAllowed(true);
+        
cmd.getSubcommands().values().forEach(PulsarClientTool::applyCaseInsensitiveEnums);
     }
 
     protected void addCommand(String name, Object cmd) {
         commander.addSubcommand(name, cmd);
+        enableCaseInsensitiveEnums();
     }
 
     private int updateConfig() throws UnsupportedAuthenticationException {
-        Map<String, Object> conf = new HashMap<>();
         Properties properties = pulsarClientPropertiesProvider.getProperties();
-        for (String key : properties.stringPropertyNames()) {
-            conf.put(key, properties.getProperty(key));
-        }
 
-        ClientBuilder clientBuilder = PulsarClient.builder().loadConf(conf)
-                .memoryLimit(rootParams.memoryLimit, SizeUnit.BYTES);
+        PulsarClientBuilder clientBuilder = PulsarClient.builder()
+                .memoryLimit(MemorySize.ofBytes(rootParams.memoryLimit));
+
+        // The v4 Authentication object is still needed by the WebSocket 
produce/consume path,
+        // which talks HTTP and is not migrated to the binary-only V5 client.
         Authentication authentication = null;
         if (isNotBlank(this.rootParams.authPluginClassName)) {
             authentication = 
AuthenticationFactory.create(rootParams.authPluginClassName, 
rootParams.authParams);
-            clientBuilder.authentication(authentication);
+            try {
+                clientBuilder.authentication(rootParams.authPluginClassName, 
rootParams.authParams);
+            } catch (org.apache.pulsar.client.api.v5.PulsarClientException e) {
+                throw new UnsupportedAuthenticationException(e);
+            }
         }
         if (isNotBlank(this.rootParams.listenerName)) {
             clientBuilder.listenerName(this.rootParams.listenerName);
         }
-        clientBuilder.serviceUrl(rootParams.serviceURL);
-        
clientBuilder.tlsTrustCertsFilePath(this.rootParams.tlsTrustCertsFilePath);
+
+        // serviceUrl is only set on the V5 (binary) client for pulsar:// / 
pulsar+ssl:// URLs.
+        // A ws:// URL means the WebSocket path is used instead, which never 
builds a V5 client,
+        // and the V5 builder rejects non-broker schemes at configure time.
+        String serviceUrl = rootParams.serviceURL;
+        if (serviceUrl != null
+                && (serviceUrl.startsWith("pulsar://") || 
serviceUrl.startsWith("pulsar+ssl://"))) {
+            clientBuilder.serviceUrl(serviceUrl);
+        }
+
+        applyTlsPolicy(clientBuilder, serviceUrl, properties);
+
         if (isNotBlank(rootParams.proxyServiceURL)) {
             if (rootParams.proxyProtocol == null) {
                 commander.getErr().println("proxy-protocol must be provided 
with proxy-url");
                 return 1;
             }
-            clientBuilder.proxyServiceUrl(rootParams.proxyServiceURL, 
rootParams.proxyProtocol);
+            clientBuilder.connectionPolicy(ConnectionPolicy.builder()
+                    .proxy(rootParams.proxyServiceURL,
+                            
org.apache.pulsar.client.api.v5.config.ProxyProtocol.valueOf(
+                                    rootParams.proxyProtocol.name()))
+                    .build());
         }
         this.produceCommand.updateConfig(clientBuilder, authentication, 
this.rootParams.serviceURL);
         this.consumeCommand.updateConfig(clientBuilder, authentication, 
this.rootParams.serviceURL);
@@ -168,6 +203,46 @@ public class PulsarClientTool implements CommandHook {
         return 0;
     }
 
+    /**
+     * Translate the client.conf TLS settings onto the typed V5 {@link 
TlsPolicy}. V5 has no
+     * untyped {@code loadConf}, so the conf-file keys that have no dedicated 
CLI flag
+     * ({@code tlsAllowInsecureConnection}, {@code 
tlsEnableHostnameVerification}, the mTLS
+     * cert/key paths) are read from the properties here.
+     *
+     * <p>TLS is enabled only when the service URL uses {@code pulsar+ssl://} 
or the conf sets
+     * {@code useTls=true}; otherwise we leave the policy untouched so a 
plaintext broker is not
+     * accidentally contacted over TLS (calling {@code tlsPolicy()} always 
flips {@code useTls}
+     * on). Keystore TLS has no V5 equivalent and is reported as unsupported.
+     */
+    private void applyTlsPolicy(PulsarClientBuilder clientBuilder, String 
serviceUrl, Properties properties) {
+        boolean tlsByUrl = serviceUrl != null && 
serviceUrl.startsWith("pulsar+ssl://");
+        boolean tlsByConf = 
Boolean.parseBoolean(properties.getProperty("useTls", "false"));
+        if (!tlsByUrl && !tlsByConf) {
+            return;
+        }
+        if (Boolean.parseBoolean(properties.getProperty("useKeyStoreTls", 
"false"))) {
+            commander.getErr().println("Warning: keystore TLS (useKeyStoreTls) 
is not supported by the "
+                    + "V5-based pulsar-client; PEM trust/cert/key settings are 
used instead.");
+        }
+        TlsPolicy.Builder tls = TlsPolicy.builder()
+                .allowInsecureConnection(
+                        
Boolean.parseBoolean(properties.getProperty("tlsAllowInsecureConnection", 
"false")))
+                .enableHostnameVerification(
+                        
Boolean.parseBoolean(properties.getProperty("tlsEnableHostnameVerification", 
"false")));
+        if (isNotBlank(rootParams.tlsTrustCertsFilePath)) {
+            tls.trustCertsFilePath(rootParams.tlsTrustCertsFilePath);
+        }
+        String certFile = properties.getProperty("tlsCertificateFilePath");
+        if (isNotBlank(certFile)) {
+            tls.certificateFilePath(certFile);
+        }
+        String keyFile = properties.getProperty("tlsKeyFilePath");
+        if (isNotBlank(keyFile)) {
+            tls.keyFilePath(keyFile);
+        }
+        clientBuilder.tlsPolicy(tls.build());
+    }
+
     public int run(String[] args) {
         return commander.execute(args);
     }
@@ -199,6 +274,7 @@ public class PulsarClientTool implements CommandHook {
             commander.getCommandSpec().removeSubcommand("produce");
         }
         commander.addSubcommand("produce", this.produceCommand);
+        enableCaseInsensitiveEnums();
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdConsume.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdConsume.java
index 9834c49f248..0c49cde9539 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdConsume.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdConsume.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.client.cli;
 
 import static org.testng.Assert.assertEquals;
 import java.lang.reflect.Field;
+import java.util.Properties;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class TestCmdConsume {
@@ -44,4 +46,25 @@ public class TestCmdConsume {
                         + 
"?subscriptionType=Exclusive&subscriptionMode=Durable");
     }
 
+    @DataProvider(name = "mixedCaseEnumArgs")
+    public Object[][] mixedCaseEnumArgs() {
+        // The V5 client enums are uppercase; the CLI must still accept the 
mixed-case v4 spellings
+        // on the (sub)commands. picocli does not propagate case-insensitive 
parsing to subcommands,
+        // so this guards the explicit per-command wiring in PulsarClientTool.
+        return new Object[][] {
+            {"-p", "Earliest"}, {"-p", "earliest"}, {"-p", "EARLIEST"}, {"-p", 
"Latest"},
+            {"-t", "Exclusive"}, {"-t", "Shared"}, {"-t", "Failover"},
+            {"-m", "NonDurable"}, {"-ca", "DISCARD"},
+        };
+    }
+
+    @Test(dataProvider = "mixedCaseEnumArgs")
+    public void testCaseInsensitiveEnumFlags(String flag, String value) {
+        Properties properties = new Properties();
+        properties.setProperty("serviceUrl", "pulsar://localhost:6650");
+        PulsarClientTool tool = new PulsarClientTool(properties);
+        // Must not throw a picocli ParameterException for the mixed-case enum 
value.
+        tool.getCommander().parseArgs("consume", "-s", "sub", flag, value,
+                "persistent://public/default/t");
+    }
 }
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
index 1b918fbe8dd..c2ccc912ec2 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
@@ -20,21 +20,25 @@ package org.apache.pulsar.client.cli;
 
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.expectThrows;
 import java.util.Collections;
 import java.util.List;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DecoderFactory;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.KeyValueSchema;
-import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.client.api.v5.schema.SchemaType;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 
 public class TestCmdProduce {
 
+    private static final String AVRO_DEF = "{\"type\": 
\"record\",\"namespace\": \"com.example\","
+            + "\"name\": \"FullName\", \"fields\": [{ \"name\": \"a\", 
\"type\": \"string\" },"
+            + "{ \"name\": \"b\", \"type\": \"int\" }]}";
+
     CmdProduce cmdProduce;
 
     @BeforeMethod
@@ -51,56 +55,42 @@ public class TestCmdProduce {
     }
 
     @Test
-    public void testBuildSchema() {
-        // default
-        assertEquals(SchemaType.BYTES, CmdProduce.buildSchema("string", 
"bytes",
-                
CmdProduce.KEY_VALUE_ENCODING_TYPE_NOT_SET).getSchemaInfo().getType());
-
-        // simple key value
-        assertEquals(SchemaType.KEY_VALUE, CmdProduce.buildSchema("string", 
"string",
-                "separated").getSchemaInfo().getType());
-        assertEquals(SchemaType.KEY_VALUE, CmdProduce.buildSchema("string", 
"string",
-                "inline").getSchemaInfo().getType());
-
-        KeyValueSchema<?, ?> composite1 = (KeyValueSchema<?, ?>) 
CmdProduce.buildSchema("string",
-                "json:{\"type\": \"record\",\"namespace\": 
\"com.example\",\"name\": \"FullName\", \"fields\":"
-                        + " [{ \"name\": \"a\", \"type\": \"string\" }]}",
-                "inline");
-        assertEquals(KeyValueEncodingType.INLINE, 
composite1.getKeyValueEncodingType());
-        assertEquals(SchemaType.STRING, 
composite1.getKeySchema().getSchemaInfo().getType());
-        assertEquals(SchemaType.JSON, 
composite1.getValueSchema().getSchemaInfo().getType());
-
-        KeyValueSchema<?, ?> composite2 = (KeyValueSchema<?, ?>) 
CmdProduce.buildSchema(
-                "json:{\"type\": \"record\",\"namespace\": 
\"com.example\",\"name\": \"FullName\", \"fields"
-                        + "\": [{ \"name\": \"a\", \"type\": \"string\" }]}",
-                "avro:{\"type\": \"record\",\"namespace\": 
\"com.example\",\"name\": \"FullName\", \"fields\":"
-                        + " [{ \"name\": \"a\", \"type\": \"string\" }]}",
-                "inline");
-        assertEquals(KeyValueEncodingType.INLINE, 
composite2.getKeyValueEncodingType());
-        assertEquals(SchemaType.JSON, 
composite2.getKeySchema().getSchemaInfo().getType());
-        assertEquals(SchemaType.AVRO, 
composite2.getValueSchema().getSchemaInfo().getType());
+    public void testBuildValueSchema() {
+        // bytes -> raw BYTES, no native Avro schema.
+        CmdProduce.ValueSchema bytes = CmdProduce.buildValueSchema("bytes");
+        assertEquals(bytes.schema().schemaInfo().type(), SchemaType.BYTES);
+        assertNull(bytes.avroNative());
+
+        // string -> AUTO_PRODUCE_BYTES wrapping string; no native Avro schema.
+        CmdProduce.ValueSchema string = CmdProduce.buildValueSchema("string");
+        assertNotNull(string.schema());
+        assertNull(string.avroNative());
+
+        // avro:<def> -> AUTO_PRODUCE_BYTES wrapping a generic Avro schema; 
native Avro present.
+        CmdProduce.ValueSchema avro = CmdProduce.buildValueSchema("avro:" + 
AVRO_DEF);
+        assertNotNull(avro.schema());
+        assertNotNull(avro.avroNative());
+
+        // json:<def> -> AUTO_PRODUCE_BYTES wrapping a generic JSON schema; no 
native Avro schema.
+        CmdProduce.ValueSchema json = CmdProduce.buildValueSchema("json:" + 
AVRO_DEF);
+        assertNotNull(json.schema());
+        assertNull(json.avroNative());
+
+        // unknown -> rejected.
+        expectThrows(IllegalArgumentException.class, () -> 
CmdProduce.buildValueSchema("nope"));
     }
 
     @Test
     public void generateAvroMessageBodies() throws Exception {
-
-        Schema<?> schema = CmdProduce.buildSchema(
-                null,
-                "avro:{\"type\": \"record\",\"namespace\": 
\"com.example\",\"name\": \"FullName\", \"fields\":"
-                        + " [{ \"name\": \"a\", \"type\": \"string\" },"
-                        + "{ \"name\": \"b\", \"type\": \"int\" }"
-                        + "]}",
-                "");
+        CmdProduce.ValueSchema vs = CmdProduce.buildValueSchema("avro:" + 
AVRO_DEF);
 
         List<byte[]> bytes = 
CmdProduce.generateMessageBodies(List.of("{\"a\":\"stringValue\",\"b\":123}"),
-                Collections.emptyList(), schema);
+                Collections.emptyList(), vs.avroNative());
         assertEquals(bytes.size(), 1);
 
-        org.apache.avro.Schema avro = (org.apache.avro.Schema) 
schema.getNativeSchema().get();
-        GenericDatumReader<GenericRecord> reader = new 
GenericDatumReader<>(avro);
+        GenericDatumReader<GenericRecord> reader = new 
GenericDatumReader<>(vs.avroNative());
         GenericRecord record = reader.read(null, 
DecoderFactory.get().binaryDecoder(bytes.get(0), null));
         assertEquals("stringValue", record.get("a").toString());
         assertEquals(123, record.get("b"));
-
     }
 }
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
index 45f508f25cb..d95476986a8 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
@@ -19,11 +19,7 @@
 package org.apache.pulsar.client.cli;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
 import java.lang.reflect.Field;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -31,11 +27,11 @@ public class TestCmdRead {
 
     @DataProvider(name = "startMessageIds")
     public Object[][] startMessageIds() {
+        // The V5-based reader (CheckpointConsumer) only accepts latest / 
earliest; the
+        // <ledgerId>:<entryId> form is no longer supported, so it is not 
exercised here.
         return new Object[][] {
             { "latest", "latest" },
             { "earliest", "earliest" },
-            { "10:0", "CAoQADAA" },
-            { "10:1", "CAoQATAA" },
         };
     }
 
@@ -51,19 +47,4 @@ public class TestCmdRead {
         assertEquals(cmdRead.getWebSocketReadUri(topicNameV2),
                 
"ws://localhost:8080/ws/v2/reader/persistent/public/default/t2?messageId=" + 
msgIdQueryParam);
     }
-
-    @Test
-    public void testParseMessageId() {
-        assertEquals(CmdRead.parseMessageId("latest"), MessageId.latest);
-        assertEquals(CmdRead.parseMessageId("earliest"), MessageId.earliest);
-        assertEquals(CmdRead.parseMessageId("20:-1"), new MessageIdImpl(20, 
-1, -1));
-        assertEquals(CmdRead.parseMessageId("30:0"), new MessageIdImpl(30, 0, 
-1));
-        try {
-            CmdRead.parseMessageId("invalid");
-            fail("Should fail to parse invalid message ID");
-        } catch (Throwable t) {
-            assertTrue(t instanceof IllegalArgumentException);
-        }
-    }
-
 }
diff --git a/pulsar-client-v5/build.gradle.kts 
b/pulsar-client-v5/build.gradle.kts
index 9371a8eae82..0a5ebda446d 100644
--- a/pulsar-client-v5/build.gradle.kts
+++ b/pulsar-client-v5/build.gradle.kts
@@ -28,7 +28,11 @@ dependencies {
     implementation(libs.slf4j.api)
     implementation(libs.slog)
     implementation(libs.opentelemetry.api)
-    implementation(libs.protobuf.java)
+    // protobuf-java is only needed at compile time (for the Schema.protobuf() 
signature, which
+    // references com.google.protobuf.Message). Keeping it compileOnly — as 
pulsar-client and
+    // pulsar-client-api-v5 do — avoids dragging protobuf into every 
distribution that bundles the
+    // V5 client; callers that actually use protobuf schemas bring 
protobuf-java themselves.
+    compileOnly(libs.protobuf.java)
     implementation(libs.netty.handler)
     implementation(libs.jackson.annotations)
     compileOnly(libs.lombok)
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
index 7e8971a4ff9..0439f67b285 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
@@ -124,6 +124,18 @@ public final class PulsarClientProviderV5 implements 
PulsarClientProvider {
         return 
SchemaAdapter.toV5(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES());
     }
 
+    @Override
+    public Schema<?> 
genericSchema(org.apache.pulsar.client.api.v5.schema.SchemaInfo schemaInfo) {
+        var v4Info = SchemaAdapter.toV4SchemaInfo(schemaInfo);
+        return 
SchemaAdapter.toV5(org.apache.pulsar.client.api.Schema.generic(v4Info));
+    }
+
+    @Override
+    public Schema<byte[]> autoProduceBytesSchema(Schema<?> base) {
+        var v4Base = SchemaAdapter.toV4(base);
+        return 
SchemaAdapter.toV5(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES(v4Base));
+    }
+
     // --- Checkpoint ---
 
     @Override
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SchemaAdapter.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SchemaAdapter.java
index e12b1a85b74..910f23a5a0a 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SchemaAdapter.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SchemaAdapter.java
@@ -54,6 +54,19 @@ final class SchemaAdapter {
         return new V4SchemaWrapper<>(v5Schema);
     }
 
+    /**
+     * Convert a v5 SchemaInfo into a v4 SchemaInfo. Used to build v4 generic 
schemas from a
+     * v5-supplied definition. The schema-type enum names are identical across 
v4 and v5.
+     */
+    static org.apache.pulsar.common.schema.SchemaInfo 
toV4SchemaInfo(SchemaInfo v5Info) {
+        return SchemaInfoImpl.builder()
+                .name(v5Info.name())
+                
.type(org.apache.pulsar.common.schema.SchemaType.valueOf(v5Info.type().name()))
+                .schema(v5Info.schema())
+                .properties(v5Info.properties())
+                .build();
+    }
+
     /**
      * Wraps a v4 Schema as a v5 Schema.
      */
diff --git 
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
new file mode 100644
index 00000000000..db4cc2b6ec6
--- /dev/null
+++ 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.nio.charset.StandardCharsets;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.api.v5.schema.SchemaInfo;
+import org.apache.pulsar.client.api.v5.schema.SchemaType;
+import org.testng.annotations.Test;
+
+/**
+ * Covers the V5 Schema factories added for the pulsar-client CLI migration:
+ * {@link Schema#generic(SchemaInfo)} and {@link 
Schema#autoProduceBytesOf(Schema)}, plus the
+ * {@link SchemaInfo#of} builder they consume.
+ */
+public class SchemaFactoryTest {
+
+    private static final String AVRO_DEF =
+            
"{\"type\":\"record\",\"name\":\"R\",\"fields\":[{\"name\":\"a\",\"type\":\"string\"}]}";
+
+    @Test
+    public void testSchemaInfoOfRoundTrips() {
+        SchemaInfo info = SchemaInfo.of("client", SchemaType.AVRO,
+                AVRO_DEF.getBytes(StandardCharsets.UTF_8), null);
+        assertEquals(info.name(), "client");
+        assertEquals(info.type(), SchemaType.AVRO);
+        assertEquals(new String(info.schema(), StandardCharsets.UTF_8), 
AVRO_DEF);
+        assertNotNull(info.properties());
+        assertEquals(info.properties().size(), 0);
+    }
+
+    @Test
+    public void testGenericSchemaFromAvroDefinition() {
+        SchemaInfo info = SchemaInfo.of("client", SchemaType.AVRO,
+                AVRO_DEF.getBytes(StandardCharsets.UTF_8), null);
+        Schema<?> generic = Schema.generic(info);
+        assertNotNull(generic);
+        assertEquals(generic.schemaInfo().type(), SchemaType.AVRO);
+    }
+
+    @Test
+    public void testAutoProduceBytesOfWrapsBase() {
+        // The CLI wraps a typed base schema so pre-encoded bytes are 
validated against it.
+        Schema<byte[]> wrapped = Schema.autoProduceBytesOf(Schema.string());
+        assertNotNull(wrapped);
+        // AUTO_PRODUCE_BYTES encodes raw bytes straight through.
+        byte[] payload = "hello".getBytes(StandardCharsets.UTF_8);
+        assertEquals(wrapped.encode(payload), payload);
+    }
+}

Reply via email to