This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ab047fd3519 [improve][cli] Support 'auto_consume' schema type in 
pulsar-client consume/read (#25927)
ab047fd3519 is described below

commit ab047fd3519a328d8fc1b5447c7431efb443df6d
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 4 02:00:18 2026 -0700

    [improve][cli] Support 'auto_consume' schema type in pulsar-client 
consume/read (#25927)
---
 .../api/v5/internal/PulsarClientProvider.java      |  3 +
 .../apache/pulsar/client/api/v5/schema/Field.java  | 28 +++++++
 .../pulsar/client/api/v5/schema/GenericRecord.java | 80 +++++++++++++++++++
 .../pulsar/client/api/v5/schema/KeyValue.java      | 33 ++++++++
 .../apache/pulsar/client/api/v5/schema/Schema.java | 12 +++
 .../pulsar/client/cli/PulsarClientToolTest.java    | 39 ++++++++++
 .../pulsar/client/cli/AbstractCmdConsume.java      | 80 ++++++++++++++++++-
 .../org/apache/pulsar/client/cli/CmdConsume.java   | 16 ++--
 .../java/org/apache/pulsar/client/cli/CmdRead.java | 16 ++--
 .../pulsar/client/impl/v5/GenericRecordV5.java     | 89 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/v5/MessageV5.java    | 10 ++-
 .../client/impl/v5/PulsarClientProviderV5.java     | 11 +++
 .../pulsar/client/impl/v5/SchemaFactoryTest.java   | 84 ++++++++++++++++++++
 13 files changed, 483 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
index 670e3bc374a..9d3e41e2d12 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.v5.MessageId;
 import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
 import org.apache.pulsar.client.api.v5.PulsarClientException;
 import org.apache.pulsar.client.api.v5.auth.Authentication;
+import org.apache.pulsar.client.api.v5.schema.GenericRecord;
 import org.apache.pulsar.client.api.v5.schema.Schema;
 
 /**
@@ -82,6 +83,8 @@ public interface PulsarClientProvider {
 
     Schema<byte[]> autoProduceBytesSchema(Schema<?> base);
 
+    Schema<GenericRecord> autoConsumeSchema();
+
     // --- Checkpoint ---
 
     Checkpoint checkpointFromBytes(byte[] data) throws IOException;
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Field.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Field.java
new file mode 100644
index 00000000000..1ea5830da5e
--- /dev/null
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Field.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.schema;
+
+/**
+ * A named field of a {@link GenericRecord}.
+ *
+ * @param name  the field name
+ * @param index the position of the field within the record
+ */
+public record Field(String name, int index) {
+}
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/GenericRecord.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/GenericRecord.java
new file mode 100644
index 00000000000..fa308c8a009
--- /dev/null
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/GenericRecord.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.schema;
+
+import java.util.List;
+
+/**
+ * A schema-less view over a decoded message value, used when the value's 
schema is discovered at
+ * runtime rather than known at compile time (see {@link 
Schema#autoConsume()}).
+ *
+ * <p>The shape of the value depends on {@link #schemaType()}:
+ * <ul>
+ *   <li>{@link SchemaType#AVRO}, {@link SchemaType#JSON}, {@link 
SchemaType#PROTOBUF_NATIVE}: a
+ *       structured record — use {@link #fields()} and {@link #field(String)} 
to read its fields. A
+ *       field value may itself be a {@link GenericRecord} (nested 
record).</li>
+ *   <li>{@link SchemaType#KEY_VALUE}: {@link #fields()} is empty and {@link 
#nativeObject()} is a
+ *       {@link KeyValue}.</li>
+ *   <li>any primitive type (e.g. {@link SchemaType#STRING}, {@link 
SchemaType#INT32}):
+ *       {@link #fields()} is empty and {@link #nativeObject()} is the 
primitive value.</li>
+ * </ul>
+ */
+public interface GenericRecord {
+
+    /**
+     * The schema type of the decoded value.
+     *
+     * @return the {@link SchemaType}
+     */
+    SchemaType schemaType();
+
+    /**
+     * The underlying decoded value: a {@link KeyValue} for {@link 
SchemaType#KEY_VALUE}, a primitive
+     * for a primitive schema, or an implementation-specific native record for 
structured types.
+     *
+     * @return the native object, or {@code null}
+     */
+    Object nativeObject();
+
+    /**
+     * The fields of this record, in declaration order. Empty for non-record 
(primitive or
+     * key/value) values.
+     *
+     * @return the list of {@link Field}s
+     */
+    List<Field> fields();
+
+    /**
+     * Get the value of a field by name.
+     *
+     * @param fieldName the field name
+     * @return the field value, or {@code null} if absent
+     */
+    Object field(String fieldName);
+
+    /**
+     * Get the value of a field.
+     *
+     * @param field the field
+     * @return the field value, or {@code null} if absent
+     */
+    default Object field(Field field) {
+        return field(field.name());
+    }
+}
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/KeyValue.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/KeyValue.java
new file mode 100644
index 00000000000..504977d6299
--- /dev/null
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/KeyValue.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.schema;
+
+/**
+ * A key/value pair, used as the {@linkplain GenericRecord#nativeObject() 
native object} of a
+ * {@link GenericRecord} whose {@linkplain GenericRecord#schemaType() schema 
type} is
+ * {@link SchemaType#KEY_VALUE}. Each side may itself be a {@link 
GenericRecord} (for a structured
+ * key or value), a primitive, or {@code null}.
+ *
+ * @param key   the key (may be {@code null})
+ * @param value the value (may be {@code null})
+ * @param <K>   the key type
+ * @param <V>   the value type
+ */
+public record KeyValue<K, V>(K key, V value) {
+}
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
index 173cf0a77ff..e5debe8fb04 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
@@ -230,4 +230,16 @@ public interface Schema<T> {
     static Schema<byte[]> autoProduceBytesOf(Schema<?> base) {
         return PulsarClientProvider.get().autoProduceBytesSchema(base);
     }
+
+    /**
+     * Get a schema that auto-detects the topic schema at runtime and decodes 
each message into a
+     * {@link GenericRecord}. Use this on the consumer side when the value 
type is not known at
+     * compile time (e.g. a generic consume tool). The decoded {@link 
GenericRecord} exposes the
+     * value's runtime {@link SchemaType} and, for structured types, its 
fields.
+     *
+     * @return a {@link Schema} that decodes messages into {@link 
GenericRecord} values
+     */
+    static Schema<GenericRecord> autoConsume() {
+        return PulsarClientProvider.get().autoConsumeSchema();
+    }
 }
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 9db7f9596f9..dbdbb4422ae 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -23,7 +23,10 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.time.Duration;
 import java.util.Properties;
@@ -40,6 +43,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -259,6 +263,41 @@ public class PulsarClientToolTest extends BrokerTestBase {
                 .until(()->admin.topics().getSubscriptions(topicName).size() 
== 0);
     }
 
+    @Test(timeOut = 30000)
+    public void testAutoConsumeSchema() throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
+        properties.setProperty("useTls", "false");
+
+        final String topicName = getTopicWithRandomSuffix("auto-consume");
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        // Produce a JSON-schema message with the v4 client so the topic 
carries a real schema; the
+        // CLI then consumes it with `-st auto_consume`, exercising the V5 
generic-record path.
+        @Cleanup
+        Producer<TestKey> producer = 
pulsarClient.newProducer(Schema.JSON(TestKey.class))
+                .topic(topicName).create();
+        producer.send(new TestKey("my-key", Integer.MAX_VALUE));
+
+        ByteArrayOutputStream consoleOutput = new ByteArrayOutputStream();
+        PrintStream originalOut = System.out;
+        System.setOut(new PrintStream(consoleOutput, true, 
StandardCharsets.UTF_8));
+        try {
+            PulsarClientTool pulsarClientToolConsumer = new 
PulsarClientTool(properties);
+            String[] args = {"consume", "-s", "sub-name", "-n", "1", "-st", 
"auto_consume",
+                    "-p", "Earliest", topicName};
+            Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
+        } finally {
+            System.setOut(originalOut);
+        }
+
+        String output = consoleOutput.toString(StandardCharsets.UTF_8);
+        assertTrue(output.contains("keyA"), output);
+        assertTrue(output.contains("my-key"), output);
+        assertTrue(output.contains("keyB"), output);
+        assertTrue(output.contains(Integer.toString(Integer.MAX_VALUE)), 
output);
+    }
+
     @Test(timeOut = 20000)
     public void testEncryption() throws Exception {
         Properties properties = new Properties();
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
index 543c6f99100..8e9e1d57ac8 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
@@ -25,6 +25,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -38,6 +40,9 @@ import 
org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.v5.auth.EncryptionKey;
 import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider;
 import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Field;
+import org.apache.pulsar.client.api.v5.schema.GenericRecord;
+import org.apache.pulsar.client.api.v5.schema.KeyValue;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.eclipse.jetty.websocket.api.Callback;
 import org.eclipse.jetty.websocket.api.Session;
@@ -84,14 +89,23 @@ public abstract class AbstractCmdConsume extends 
AbstractCmd {
      *            Whether to display BytesMessages in hexdump style, ignored 
for simple text messages
      * @return String representation of the message
      */
-    protected String interpretMessage(Message<byte[]> message, boolean 
displayHex, boolean printMetadata)
+    protected String interpretMessage(Message<?> message, boolean displayHex, 
boolean printMetadata)
             throws IOException {
         StringBuilder sb = new StringBuilder();
 
         String properties = 
Arrays.toString(message.properties().entrySet().toArray());
 
-        byte[] value = message.value();
-        String data = value == null ? "null" : interpretByteArray(displayHex, 
value);
+        Object value = message.value();
+        String data;
+        if (value == null) {
+            data = "null";
+        } else if (value instanceof byte[]) {
+            data = interpretByteArray(displayHex, (byte[]) value);
+        } else if (value instanceof GenericRecord) {
+            data = genericObjectToMap((GenericRecord) value, 
displayHex).toString();
+        } else {
+            data = value.toString();
+        }
 
         sb.append("publishTime:[").append(message.publishTime()).append("], ");
         
sb.append("eventTime:[").append(message.eventTime().orElse(null)).append("], ");
@@ -122,6 +136,66 @@ public abstract class AbstractCmdConsume extends 
AbstractCmd {
         }
     }
 
+    /**
+     * Render an {@code auto_consume} {@link GenericRecord} value into a 
{@link Map} for display.
+     * The shape is dispatched on the record's runtime schema type: structured 
records become a map
+     * of their fields, key/value records become a {@code {key, value}} map, 
and primitives are
+     * wrapped in a single {@code value} entry.
+     */
+    protected static Map<String, Object> genericObjectToMap(GenericRecord 
value, boolean displayHex)
+            throws IOException {
+        switch (value.schemaType()) {
+            case AVRO:
+            case JSON:
+            case PROTOBUF_NATIVE:
+                return genericRecordToMap(value, displayHex);
+            case KEY_VALUE:
+                return keyValueToMap((KeyValue<?, ?>) value.nativeObject(), 
displayHex);
+            default:
+                return primitiveValueToMap(value.nativeObject(), displayHex);
+        }
+    }
+
+    protected static Map<String, Object> keyValueToMap(KeyValue<?, ?> value, 
boolean displayHex)
+            throws IOException {
+        if (value == null) {
+            return Map.of("value", "NULL");
+        }
+        return Map.of("key", primitiveValueToMap(value.key(), displayHex),
+                "value", primitiveValueToMap(value.value(), displayHex));
+    }
+
+    protected static Map<String, Object> primitiveValueToMap(Object value, 
boolean displayHex)
+            throws IOException {
+        if (value == null) {
+            return Map.of("value", "NULL");
+        }
+        if (value instanceof GenericRecord) {
+            return genericObjectToMap((GenericRecord) value, displayHex);
+        }
+        if (value instanceof byte[]) {
+            value = interpretByteArray(displayHex, (byte[]) value);
+        }
+        return Map.of("value", value.toString(), "type", value.getClass());
+    }
+
+    protected static Map<String, Object> genericRecordToMap(GenericRecord 
value, boolean displayHex)
+            throws IOException {
+        Map<String, Object> res = new HashMap<>();
+        for (Field f : value.fields()) {
+            Object fieldValue = value.field(f);
+            if (fieldValue instanceof GenericRecord) {
+                fieldValue = genericRecordToMap((GenericRecord) fieldValue, 
displayHex);
+            } else if (fieldValue == null) {
+                fieldValue = "NULL";
+            } else if (fieldValue instanceof byte[]) {
+                fieldValue = interpretByteArray(displayHex, (byte[]) 
fieldValue);
+            }
+            res.put(f.name(), fieldValue);
+        }
+        return res;
+    }
+
     /**
      * Build a consumer-side decryption policy from a {@code file://} key URI, 
mirroring the v4
      * {@code defaultCryptoKeyReader(uri)} semantics: the private key is 
loaded once and returned
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 4c7164d7db3..50f52a84ad1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -179,11 +179,13 @@ public class CmdConsume extends AbstractCmdConsume {
         int numMessagesConsumed = 0;
         int returnCode = 0;
 
+        final Schema<?> schema;
         if ("auto_consume".equals(schemaType)) {
-            throw new IllegalArgumentException("schema type 'auto_consume' is 
not supported by this "
-                    + "version of pulsar-client; consume with 'bytes' (the 
default).");
-        } else if (!"bytes".equals(schemaType)) {
-            throw new IllegalArgumentException("schema type must be 'bytes'");
+            schema = Schema.autoConsume();
+        } else if ("bytes".equals(schemaType)) {
+            schema = Schema.bytes();
+        } else {
+            throw new IllegalArgumentException("schema type must be 'bytes' or 
'auto_consume'");
         }
         if (!poolMessages) {
             LOG.info("--pool-messages has no effect on this version of 
pulsar-client.");
@@ -208,7 +210,7 @@ public class CmdConsume extends AbstractCmdConsume {
 
         try (PulsarClient client = clientBuilder.build()) {
             RateLimiter limiter = (this.consumeRate > 0) ? 
RateLimiter.create(this.consumeRate) : null;
-            QueueConsumerBuilder<byte[]> builder = 
client.newQueueConsumer(Schema.bytes())
+            QueueConsumerBuilder<?> builder = client.newQueueConsumer(schema)
                     .subscriptionName(this.subscriptionName)
                     .subscriptionInitialPosition(subscriptionInitialPosition)
                     .replicateSubscriptionState(replicateSubscriptionState);
@@ -220,12 +222,12 @@ public class CmdConsume extends AbstractCmdConsume {
             }
             applyTopicSelection(builder::topic, builder::namespace);
 
-            try (QueueConsumer<byte[]> consumer = builder.subscribe()) {
+            try (QueueConsumer<?> consumer = builder.subscribe()) {
                 while (this.numMessagesToConsume == 0 || numMessagesConsumed < 
this.numMessagesToConsume) {
                     if (limiter != null) {
                         limiter.acquire();
                     }
-                    Message<byte[]> msg = 
consumer.receive(Duration.ofSeconds(5));
+                    Message<?> msg = consumer.receive(Duration.ofSeconds(5));
                     if (msg == null) {
                         LOG.debug("No message to consume after waiting for 5 
seconds.");
                     } else {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
index 63ecb4e4e76..33a5c8e9036 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
@@ -139,11 +139,13 @@ public class CmdRead extends AbstractCmdConsume {
         int numMessagesRead = 0;
         int returnCode = 0;
 
+        final Schema<?> schema;
         if ("auto_consume".equals(schemaType)) {
-            throw new IllegalArgumentException("schema type 'auto_consume' is 
not supported by this "
-                    + "version of pulsar-client; read with 'bytes' (the 
default).");
-        } else if (!"bytes".equals(schemaType)) {
-            throw new IllegalArgumentException("schema type must be 'bytes'");
+            schema = Schema.autoConsume();
+        } else if ("bytes".equals(schemaType)) {
+            schema = Schema.bytes();
+        } else {
+            throw new IllegalArgumentException("schema type must be 'bytes' or 
'auto_consume'");
         }
         if (!poolMessages) {
             LOG.info("--pool-messages has no effect on this version of 
pulsar-client.");
@@ -160,21 +162,21 @@ public class CmdRead extends AbstractCmdConsume {
                 ? Checkpoint.earliest() : Checkpoint.latest();
 
         try (PulsarClient client = clientBuilder.build()) {
-            CheckpointConsumerBuilder<byte[]> builder = 
client.newCheckpointConsumer(Schema.bytes())
+            CheckpointConsumerBuilder<?> builder = 
client.newCheckpointConsumer(schema)
                     .topic(topic)
                     .startPosition(startPosition);
             if (isNotBlank(this.encKeyValue)) {
                 builder.encryptionPolicy(buildConsumerEncryptionPolicy());
             }
 
-            try (CheckpointConsumer<byte[]> reader = builder.create()) {
+            try (CheckpointConsumer<?> reader = builder.create()) {
                 RateLimiter limiter = (this.readRate > 0) ? 
RateLimiter.create(this.readRate) : null;
                 while (this.numMessagesToRead == 0 || numMessagesRead < 
this.numMessagesToRead) {
                     if (limiter != null) {
                         limiter.acquire();
                     }
 
-                    Message<byte[]> msg = 
reader.receive(Duration.ofSeconds(5));
+                    Message<?> msg = reader.receive(Duration.ofSeconds(5));
                     if (msg == null) {
                         LOG.debug("No message to read after waiting for 5 
seconds.");
                     } else {
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/GenericRecordV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/GenericRecordV5.java
new file mode 100644
index 00000000000..61371d95fbb
--- /dev/null
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/GenericRecordV5.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import java.util.List;
+import org.apache.pulsar.client.api.v5.schema.Field;
+import org.apache.pulsar.client.api.v5.schema.GenericRecord;
+import org.apache.pulsar.client.api.v5.schema.KeyValue;
+import org.apache.pulsar.client.api.v5.schema.SchemaType;
+
+/**
+ * Adapts a v4 {@link org.apache.pulsar.client.api.schema.GenericRecord} 
(produced by the v4
+ * {@code AUTO_CONSUME} schema) to the v5 {@link GenericRecord} interface. 
Nested records and
+ * {@code KEY_VALUE} native objects are converted lazily on access so the v5 
API never leaks v4
+ * types.
+ */
+final class GenericRecordV5 implements GenericRecord {
+
+    private final org.apache.pulsar.client.api.schema.GenericRecord v4Record;
+    private final List<Field> fields;
+
+    private GenericRecordV5(org.apache.pulsar.client.api.schema.GenericRecord 
v4Record) {
+        this.v4Record = v4Record;
+        this.fields = v4Record.getFields().stream()
+                .map(f -> new Field(f.getName(), f.getIndex()))
+                .toList();
+    }
+
+    /**
+     * Convert a decoded value into its v5-facing form: a v4 {@code 
GenericRecord} becomes a
+     * {@link GenericRecordV5}, a v4 {@code KeyValue} becomes a v5 {@link 
KeyValue} (with both sides
+     * converted recursively), and anything else (primitive, byte[], ...) is 
returned unchanged.
+     */
+    static Object convert(Object value) {
+        if (value instanceof org.apache.pulsar.client.api.schema.GenericRecord 
gr) {
+            return new GenericRecordV5(gr);
+        }
+        if (value instanceof org.apache.pulsar.common.schema.KeyValue<?, ?> 
kv) {
+            return new KeyValue<>(convert(kv.getKey()), 
convert(kv.getValue()));
+        }
+        return value;
+    }
+
+    @Override
+    public SchemaType schemaType() {
+        org.apache.pulsar.common.schema.SchemaType v4Type = 
v4Record.getSchemaType();
+        if (v4Type == null) {
+            return SchemaType.NONE;
+        }
+        try {
+            return SchemaType.valueOf(v4Type.name());
+        } catch (IllegalArgumentException e) {
+            // v4 has primitive types with no v5 counterpart (e.g. date/time 
variants); they are
+            // handled by the generic "primitive" path, so NONE is a safe 
mapping.
+            return SchemaType.NONE;
+        }
+    }
+
+    @Override
+    public Object nativeObject() {
+        return convert(v4Record.getNativeObject());
+    }
+
+    @Override
+    public List<Field> fields() {
+        return fields;
+    }
+
+    @Override
+    public Object field(String fieldName) {
+        return convert(v4Record.getField(fieldName));
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
index 1a9444911f4..b349c941c7f 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.v5;
 import java.time.Instant;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.v5.Message;
 import org.apache.pulsar.client.api.v5.MessageId;
 
@@ -84,8 +85,15 @@ final class MessageV5<T> implements Message<T> {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public T value() {
-        return v4Message.getValue();
+        Object value = v4Message.getValue();
+        // The v4 AUTO_CONSUME / generic schemas decode into a v4 
GenericRecord; surface it through
+        // the v5 API as a v5 GenericRecord. All other values pass through 
unchanged.
+        if (value instanceof GenericRecord) {
+            return (T) GenericRecordV5.convert(value);
+        }
+        return (T) value;
     }
 
     @Override
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
index 0439f67b285..fe8ac87a70d 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
 import org.apache.pulsar.client.api.v5.PulsarClientException;
 import org.apache.pulsar.client.api.v5.auth.Authentication;
 import org.apache.pulsar.client.api.v5.internal.PulsarClientProvider;
+import org.apache.pulsar.client.api.v5.schema.GenericRecord;
 import org.apache.pulsar.client.api.v5.schema.Schema;
 
 /**
@@ -136,6 +137,16 @@ public final class PulsarClientProviderV5 implements 
PulsarClientProvider {
         return 
SchemaAdapter.toV5(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES(v4Base));
     }
 
+    @Override
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public Schema<GenericRecord> autoConsumeSchema() {
+        // Wrap the genuine v4 AUTO_CONSUME schema so the v5 consumer passes 
it straight through to
+        // the v4 layer (which special-cases AutoConsumeSchema for runtime 
schema fetching). The
+        // decoded v4 GenericRecord is converted to a v5 GenericRecord in 
MessageV5#value().
+        return SchemaAdapter.toV5((org.apache.pulsar.client.api.Schema)
+                org.apache.pulsar.client.api.Schema.AUTO_CONSUME());
+    }
+
     // --- Checkpoint ---
 
     @Override
diff --git 
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
index db4cc2b6ec6..75a844c58db 100644
--- 
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
+++ 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
@@ -20,10 +20,17 @@ package org.apache.pulsar.client.impl.v5;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.v5.schema.GenericRecord;
+import org.apache.pulsar.client.api.v5.schema.KeyValue;
 import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.client.api.v5.schema.SchemaInfo;
 import org.apache.pulsar.client.api.v5.schema.SchemaType;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.testng.annotations.Test;
 
 /**
@@ -65,4 +72,81 @@ public class SchemaFactoryTest {
         byte[] payload = "hello".getBytes(StandardCharsets.UTF_8);
         assertEquals(wrapped.encode(payload), payload);
     }
+
+    @Test
+    public void testAutoConsumeUnwrapsToV4AutoConsumeSchema() {
+        // The v5 auto-consume schema must pass the genuine v4 
AutoConsumeSchema down to the v4
+        // consumer, which special-cases it for runtime schema fetching.
+        Schema<GenericRecord> autoConsume = Schema.autoConsume();
+        assertNotNull(autoConsume);
+        // The v4 AutoConsumeSchema reports no SchemaInfo until it fetches the 
topic's schema at
+        // runtime, so we only assert the unwrapped v4 schema is the genuine 
AutoConsumeSchema.
+        assertTrue((Object) SchemaAdapter.toV4(autoConsume) instanceof 
AutoConsumeSchema);
+    }
+
+    @Test
+    public void testGenericRecordConversion() {
+        // A nested v4 GenericRecord field is surfaced as a v5 GenericRecord.
+        org.apache.pulsar.client.api.schema.GenericRecord v4Nested = v4Record(
+                org.apache.pulsar.common.schema.SchemaType.AVRO,
+                List.of(new Field("inner", 0)), Map.of("inner", "x"), null);
+        org.apache.pulsar.client.api.schema.GenericRecord v4Outer = v4Record(
+                org.apache.pulsar.common.schema.SchemaType.AVRO,
+                List.of(new Field("a", 0), new Field("nested", 1)),
+                Map.of("a", "hello", "nested", v4Nested), null);
+
+        GenericRecord outer = (GenericRecord) GenericRecordV5.convert(v4Outer);
+        assertEquals(outer.schemaType(), SchemaType.AVRO);
+        assertEquals(outer.fields().size(), 2);
+        assertEquals(outer.field("a"), "hello");
+        assertTrue(outer.field("nested") instanceof GenericRecord);
+        assertEquals(((GenericRecord) outer.field("nested")).field("inner"), 
"x");
+    }
+
+    @Test
+    public void testKeyValueConversion() {
+        // A KEY_VALUE wrapper's native object is surfaced as a v5 KeyValue.
+        org.apache.pulsar.common.schema.KeyValue<String, String> v4Kv =
+                new org.apache.pulsar.common.schema.KeyValue<>("k", "v");
+        org.apache.pulsar.client.api.schema.GenericRecord v4Record = v4Record(
+                org.apache.pulsar.common.schema.SchemaType.KEY_VALUE, 
List.of(), Map.of(), v4Kv);
+
+        GenericRecord record = (GenericRecord) 
GenericRecordV5.convert(v4Record);
+        assertEquals(record.schemaType(), SchemaType.KEY_VALUE);
+        assertTrue(record.nativeObject() instanceof KeyValue);
+        KeyValue<?, ?> kv = (KeyValue<?, ?>) record.nativeObject();
+        assertEquals(kv.key(), "k");
+        assertEquals(kv.value(), "v");
+    }
+
+    private static org.apache.pulsar.client.api.schema.GenericRecord v4Record(
+            org.apache.pulsar.common.schema.SchemaType type, List<Field> 
fields,
+            Map<String, Object> values, Object nativeObject) {
+        return new org.apache.pulsar.client.api.schema.GenericRecord() {
+            @Override
+            public byte[] getSchemaVersion() {
+                return null;
+            }
+
+            @Override
+            public List<Field> getFields() {
+                return fields;
+            }
+
+            @Override
+            public Object getField(String fieldName) {
+                return values.get(fieldName);
+            }
+
+            @Override
+            public org.apache.pulsar.common.schema.SchemaType getSchemaType() {
+                return type;
+            }
+
+            @Override
+            public Object getNativeObject() {
+                return nativeObject;
+            }
+        };
+    }
 }


Reply via email to