This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ab047fd3519 [improve][cli] Support 'auto_consume' schema type in
pulsar-client consume/read (#25927)
ab047fd3519 is described below
commit ab047fd3519a328d8fc1b5447c7431efb443df6d
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 4 02:00:18 2026 -0700
[improve][cli] Support 'auto_consume' schema type in pulsar-client
consume/read (#25927)
---
.../api/v5/internal/PulsarClientProvider.java | 3 +
.../apache/pulsar/client/api/v5/schema/Field.java | 28 +++++++
.../pulsar/client/api/v5/schema/GenericRecord.java | 80 +++++++++++++++++++
.../pulsar/client/api/v5/schema/KeyValue.java | 33 ++++++++
.../apache/pulsar/client/api/v5/schema/Schema.java | 12 +++
.../pulsar/client/cli/PulsarClientToolTest.java | 39 ++++++++++
.../pulsar/client/cli/AbstractCmdConsume.java | 80 ++++++++++++++++++-
.../org/apache/pulsar/client/cli/CmdConsume.java | 16 ++--
.../java/org/apache/pulsar/client/cli/CmdRead.java | 16 ++--
.../pulsar/client/impl/v5/GenericRecordV5.java | 89 ++++++++++++++++++++++
.../apache/pulsar/client/impl/v5/MessageV5.java | 10 ++-
.../client/impl/v5/PulsarClientProviderV5.java | 11 +++
.../pulsar/client/impl/v5/SchemaFactoryTest.java | 84 ++++++++++++++++++++
13 files changed, 483 insertions(+), 18 deletions(-)
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
index 670e3bc374a..9d3e41e2d12 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.v5.MessageId;
import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
import org.apache.pulsar.client.api.v5.PulsarClientException;
import org.apache.pulsar.client.api.v5.auth.Authentication;
+import org.apache.pulsar.client.api.v5.schema.GenericRecord;
import org.apache.pulsar.client.api.v5.schema.Schema;
/**
@@ -82,6 +83,8 @@ public interface PulsarClientProvider {
Schema<byte[]> autoProduceBytesSchema(Schema<?> base);
+ Schema<GenericRecord> autoConsumeSchema();
+
// --- Checkpoint ---
Checkpoint checkpointFromBytes(byte[] data) throws IOException;
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Field.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Field.java
new file mode 100644
index 00000000000..1ea5830da5e
--- /dev/null
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Field.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.schema;
+
+/**
+ * A named field of a {@link GenericRecord}.
+ *
+ * @param name the field name
+ * @param index the position of the field within the record
+ */
+public record Field(String name, int index) {
+}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/GenericRecord.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/GenericRecord.java
new file mode 100644
index 00000000000..fa308c8a009
--- /dev/null
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/GenericRecord.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.schema;
+
+import java.util.List;
+
+/**
+ * A schema-less view over a decoded message value, used when the value's
schema is discovered at
+ * runtime rather than known at compile time (see {@link
Schema#autoConsume()}).
+ *
+ * <p>The shape of the value depends on {@link #schemaType()}:
+ * <ul>
+ * <li>{@link SchemaType#AVRO}, {@link SchemaType#JSON}, {@link
SchemaType#PROTOBUF_NATIVE}: a
+ * structured record — use {@link #fields()} and {@link #field(String)}
to read its fields. A
+ * field value may itself be a {@link GenericRecord} (nested
record).</li>
+ * <li>{@link SchemaType#KEY_VALUE}: {@link #fields()} is empty and {@link
#nativeObject()} is a
+ * {@link KeyValue}.</li>
+ * <li>any primitive type (e.g. {@link SchemaType#STRING}, {@link
SchemaType#INT32}):
+ * {@link #fields()} is empty and {@link #nativeObject()} is the
primitive value.</li>
+ * </ul>
+ */
+public interface GenericRecord {
+
+ /**
+ * The schema type of the decoded value.
+ *
+ * @return the {@link SchemaType}
+ */
+ SchemaType schemaType();
+
+ /**
+ * The underlying decoded value: a {@link KeyValue} for {@link
SchemaType#KEY_VALUE}, a primitive
+ * for a primitive schema, or an implementation-specific native record for
structured types.
+ *
+ * @return the native object, or {@code null}
+ */
+ Object nativeObject();
+
+ /**
+ * The fields of this record, in declaration order. Empty for non-record
(primitive or
+ * key/value) values.
+ *
+ * @return the list of {@link Field}s
+ */
+ List<Field> fields();
+
+ /**
+ * Get the value of a field by name.
+ *
+ * @param fieldName the field name
+ * @return the field value, or {@code null} if absent
+ */
+ Object field(String fieldName);
+
+ /**
+ * Get the value of a field.
+ *
+ * @param field the field
+ * @return the field value, or {@code null} if absent
+ */
+ default Object field(Field field) {
+ return field(field.name());
+ }
+}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/KeyValue.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/KeyValue.java
new file mode 100644
index 00000000000..504977d6299
--- /dev/null
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/KeyValue.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.schema;
+
+/**
+ * A key/value pair, used as the {@linkplain GenericRecord#nativeObject()
native object} of a
+ * {@link GenericRecord} whose {@linkplain GenericRecord#schemaType() schema
type} is
+ * {@link SchemaType#KEY_VALUE}. Each side may itself be a {@link
GenericRecord} (for a structured
+ * key or value), a primitive, or {@code null}.
+ *
+ * @param key the key (may be {@code null})
+ * @param value the value (may be {@code null})
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public record KeyValue<K, V>(K key, V value) {
+}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
index 173cf0a77ff..e5debe8fb04 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
@@ -230,4 +230,16 @@ public interface Schema<T> {
static Schema<byte[]> autoProduceBytesOf(Schema<?> base) {
return PulsarClientProvider.get().autoProduceBytesSchema(base);
}
+
+ /**
+ * Get a schema that auto-detects the topic schema at runtime and decodes
each message into a
+ * {@link GenericRecord}. Use this on the consumer side when the value
type is not known at
+ * compile time (e.g. a generic consume tool). The decoded {@link
GenericRecord} exposes the
+ * value's runtime {@link SchemaType} and, for structured types, its
fields.
+ *
+ * @return a {@link Schema} that decodes messages into {@link
GenericRecord} values
+ */
+ static Schema<GenericRecord> autoConsume() {
+ return PulsarClientProvider.get().autoConsumeSchema();
+ }
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 9db7f9596f9..dbdbb4422ae 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -23,7 +23,10 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Properties;
@@ -40,6 +43,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -259,6 +263,41 @@ public class PulsarClientToolTest extends BrokerTestBase {
.until(()->admin.topics().getSubscriptions(topicName).size()
== 0);
}
+ @Test(timeOut = 30000)
+ public void testAutoConsumeSchema() throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
+ properties.setProperty("useTls", "false");
+
+ final String topicName = getTopicWithRandomSuffix("auto-consume");
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ // Produce a JSON-schema message with the v4 client so the topic
carries a real schema; the
+ // CLI then consumes it with `-st auto_consume`, exercising the V5
generic-record path.
+ @Cleanup
+ Producer<TestKey> producer =
pulsarClient.newProducer(Schema.JSON(TestKey.class))
+ .topic(topicName).create();
+ producer.send(new TestKey("my-key", Integer.MAX_VALUE));
+
+ ByteArrayOutputStream consoleOutput = new ByteArrayOutputStream();
+ PrintStream originalOut = System.out;
+ System.setOut(new PrintStream(consoleOutput, true,
StandardCharsets.UTF_8));
+ try {
+ PulsarClientTool pulsarClientToolConsumer = new
PulsarClientTool(properties);
+ String[] args = {"consume", "-s", "sub-name", "-n", "1", "-st",
"auto_consume",
+ "-p", "Earliest", topicName};
+ Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
+ } finally {
+ System.setOut(originalOut);
+ }
+
+ String output = consoleOutput.toString(StandardCharsets.UTF_8);
+ assertTrue(output.contains("keyA"), output);
+ assertTrue(output.contains("my-key"), output);
+ assertTrue(output.contains("keyB"), output);
+ assertTrue(output.contains(Integer.toString(Integer.MAX_VALUE)),
output);
+ }
+
@Test(timeOut = 20000)
public void testEncryption() throws Exception {
Properties properties = new Properties();
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
index 543c6f99100..8e9e1d57ac8 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
@@ -25,6 +25,8 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -38,6 +40,9 @@ import
org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.v5.auth.EncryptionKey;
import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider;
import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Field;
+import org.apache.pulsar.client.api.v5.schema.GenericRecord;
+import org.apache.pulsar.client.api.v5.schema.KeyValue;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Session;
@@ -84,14 +89,23 @@ public abstract class AbstractCmdConsume extends
AbstractCmd {
* Whether to display BytesMessages in hexdump style, ignored
for simple text messages
* @return String representation of the message
*/
- protected String interpretMessage(Message<byte[]> message, boolean
displayHex, boolean printMetadata)
+ protected String interpretMessage(Message<?> message, boolean displayHex,
boolean printMetadata)
throws IOException {
StringBuilder sb = new StringBuilder();
String properties =
Arrays.toString(message.properties().entrySet().toArray());
- byte[] value = message.value();
- String data = value == null ? "null" : interpretByteArray(displayHex,
value);
+ Object value = message.value();
+ String data;
+ if (value == null) {
+ data = "null";
+ } else if (value instanceof byte[]) {
+ data = interpretByteArray(displayHex, (byte[]) value);
+ } else if (value instanceof GenericRecord) {
+ data = genericObjectToMap((GenericRecord) value,
displayHex).toString();
+ } else {
+ data = value.toString();
+ }
sb.append("publishTime:[").append(message.publishTime()).append("], ");
sb.append("eventTime:[").append(message.eventTime().orElse(null)).append("], ");
@@ -122,6 +136,66 @@ public abstract class AbstractCmdConsume extends
AbstractCmd {
}
}
+ /**
+ * Render an {@code auto_consume} {@link GenericRecord} value into a
{@link Map} for display.
+ * The shape is dispatched on the record's runtime schema type: structured
records become a map
+ * of their fields, key/value records become a {@code {key, value}} map,
and primitives are
+ * wrapped in a single {@code value} entry.
+ */
+ protected static Map<String, Object> genericObjectToMap(GenericRecord
value, boolean displayHex)
+ throws IOException {
+ switch (value.schemaType()) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF_NATIVE:
+ return genericRecordToMap(value, displayHex);
+ case KEY_VALUE:
+ return keyValueToMap((KeyValue<?, ?>) value.nativeObject(),
displayHex);
+ default:
+ return primitiveValueToMap(value.nativeObject(), displayHex);
+ }
+ }
+
+ protected static Map<String, Object> keyValueToMap(KeyValue<?, ?> value,
boolean displayHex)
+ throws IOException {
+ if (value == null) {
+ return Map.of("value", "NULL");
+ }
+ return Map.of("key", primitiveValueToMap(value.key(), displayHex),
+ "value", primitiveValueToMap(value.value(), displayHex));
+ }
+
+ protected static Map<String, Object> primitiveValueToMap(Object value,
boolean displayHex)
+ throws IOException {
+ if (value == null) {
+ return Map.of("value", "NULL");
+ }
+ if (value instanceof GenericRecord) {
+ return genericObjectToMap((GenericRecord) value, displayHex);
+ }
+ if (value instanceof byte[]) {
+ value = interpretByteArray(displayHex, (byte[]) value);
+ }
+ return Map.of("value", value.toString(), "type", value.getClass());
+ }
+
+ protected static Map<String, Object> genericRecordToMap(GenericRecord
value, boolean displayHex)
+ throws IOException {
+ Map<String, Object> res = new HashMap<>();
+ for (Field f : value.fields()) {
+ Object fieldValue = value.field(f);
+ if (fieldValue instanceof GenericRecord) {
+ fieldValue = genericRecordToMap((GenericRecord) fieldValue,
displayHex);
+ } else if (fieldValue == null) {
+ fieldValue = "NULL";
+ } else if (fieldValue instanceof byte[]) {
+ fieldValue = interpretByteArray(displayHex, (byte[])
fieldValue);
+ }
+ res.put(f.name(), fieldValue);
+ }
+ return res;
+ }
+
/**
* Build a consumer-side decryption policy from a {@code file://} key URI,
mirroring the v4
* {@code defaultCryptoKeyReader(uri)} semantics: the private key is
loaded once and returned
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 4c7164d7db3..50f52a84ad1 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -179,11 +179,13 @@ public class CmdConsume extends AbstractCmdConsume {
int numMessagesConsumed = 0;
int returnCode = 0;
+ final Schema<?> schema;
if ("auto_consume".equals(schemaType)) {
- throw new IllegalArgumentException("schema type 'auto_consume' is
not supported by this "
- + "version of pulsar-client; consume with 'bytes' (the
default).");
- } else if (!"bytes".equals(schemaType)) {
- throw new IllegalArgumentException("schema type must be 'bytes'");
+ schema = Schema.autoConsume();
+ } else if ("bytes".equals(schemaType)) {
+ schema = Schema.bytes();
+ } else {
+ throw new IllegalArgumentException("schema type must be 'bytes' or
'auto_consume'");
}
if (!poolMessages) {
LOG.info("--pool-messages has no effect on this version of
pulsar-client.");
@@ -208,7 +210,7 @@ public class CmdConsume extends AbstractCmdConsume {
try (PulsarClient client = clientBuilder.build()) {
RateLimiter limiter = (this.consumeRate > 0) ?
RateLimiter.create(this.consumeRate) : null;
- QueueConsumerBuilder<byte[]> builder =
client.newQueueConsumer(Schema.bytes())
+ QueueConsumerBuilder<?> builder = client.newQueueConsumer(schema)
.subscriptionName(this.subscriptionName)
.subscriptionInitialPosition(subscriptionInitialPosition)
.replicateSubscriptionState(replicateSubscriptionState);
@@ -220,12 +222,12 @@ public class CmdConsume extends AbstractCmdConsume {
}
applyTopicSelection(builder::topic, builder::namespace);
- try (QueueConsumer<byte[]> consumer = builder.subscribe()) {
+ try (QueueConsumer<?> consumer = builder.subscribe()) {
while (this.numMessagesToConsume == 0 || numMessagesConsumed <
this.numMessagesToConsume) {
if (limiter != null) {
limiter.acquire();
}
- Message<byte[]> msg =
consumer.receive(Duration.ofSeconds(5));
+ Message<?> msg = consumer.receive(Duration.ofSeconds(5));
if (msg == null) {
LOG.debug("No message to consume after waiting for 5
seconds.");
} else {
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
index 63ecb4e4e76..33a5c8e9036 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
@@ -139,11 +139,13 @@ public class CmdRead extends AbstractCmdConsume {
int numMessagesRead = 0;
int returnCode = 0;
+ final Schema<?> schema;
if ("auto_consume".equals(schemaType)) {
- throw new IllegalArgumentException("schema type 'auto_consume' is
not supported by this "
- + "version of pulsar-client; read with 'bytes' (the
default).");
- } else if (!"bytes".equals(schemaType)) {
- throw new IllegalArgumentException("schema type must be 'bytes'");
+ schema = Schema.autoConsume();
+ } else if ("bytes".equals(schemaType)) {
+ schema = Schema.bytes();
+ } else {
+ throw new IllegalArgumentException("schema type must be 'bytes' or
'auto_consume'");
}
if (!poolMessages) {
LOG.info("--pool-messages has no effect on this version of
pulsar-client.");
@@ -160,21 +162,21 @@ public class CmdRead extends AbstractCmdConsume {
? Checkpoint.earliest() : Checkpoint.latest();
try (PulsarClient client = clientBuilder.build()) {
- CheckpointConsumerBuilder<byte[]> builder =
client.newCheckpointConsumer(Schema.bytes())
+ CheckpointConsumerBuilder<?> builder =
client.newCheckpointConsumer(schema)
.topic(topic)
.startPosition(startPosition);
if (isNotBlank(this.encKeyValue)) {
builder.encryptionPolicy(buildConsumerEncryptionPolicy());
}
- try (CheckpointConsumer<byte[]> reader = builder.create()) {
+ try (CheckpointConsumer<?> reader = builder.create()) {
RateLimiter limiter = (this.readRate > 0) ?
RateLimiter.create(this.readRate) : null;
while (this.numMessagesToRead == 0 || numMessagesRead <
this.numMessagesToRead) {
if (limiter != null) {
limiter.acquire();
}
- Message<byte[]> msg =
reader.receive(Duration.ofSeconds(5));
+ Message<?> msg = reader.receive(Duration.ofSeconds(5));
if (msg == null) {
LOG.debug("No message to read after waiting for 5
seconds.");
} else {
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/GenericRecordV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/GenericRecordV5.java
new file mode 100644
index 00000000000..61371d95fbb
--- /dev/null
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/GenericRecordV5.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import java.util.List;
+import org.apache.pulsar.client.api.v5.schema.Field;
+import org.apache.pulsar.client.api.v5.schema.GenericRecord;
+import org.apache.pulsar.client.api.v5.schema.KeyValue;
+import org.apache.pulsar.client.api.v5.schema.SchemaType;
+
+/**
+ * Adapts a v4 {@link org.apache.pulsar.client.api.schema.GenericRecord}
(produced by the v4
+ * {@code AUTO_CONSUME} schema) to the v5 {@link GenericRecord} interface.
Nested records and
+ * {@code KEY_VALUE} native objects are converted lazily on access so the v5
API never leaks v4
+ * types.
+ */
+final class GenericRecordV5 implements GenericRecord {
+
+ private final org.apache.pulsar.client.api.schema.GenericRecord v4Record;
+ private final List<Field> fields;
+
+ private GenericRecordV5(org.apache.pulsar.client.api.schema.GenericRecord
v4Record) {
+ this.v4Record = v4Record;
+ this.fields = v4Record.getFields().stream()
+ .map(f -> new Field(f.getName(), f.getIndex()))
+ .toList();
+ }
+
+ /**
+ * Convert a decoded value into its v5-facing form: a v4 {@code
GenericRecord} becomes a
+ * {@link GenericRecordV5}, a v4 {@code KeyValue} becomes a v5 {@link
KeyValue} (with both sides
+ * converted recursively), and anything else (primitive, byte[], ...) is
returned unchanged.
+ */
+ static Object convert(Object value) {
+ if (value instanceof org.apache.pulsar.client.api.schema.GenericRecord
gr) {
+ return new GenericRecordV5(gr);
+ }
+ if (value instanceof org.apache.pulsar.common.schema.KeyValue<?, ?>
kv) {
+ return new KeyValue<>(convert(kv.getKey()),
convert(kv.getValue()));
+ }
+ return value;
+ }
+
+ @Override
+ public SchemaType schemaType() {
+ org.apache.pulsar.common.schema.SchemaType v4Type =
v4Record.getSchemaType();
+ if (v4Type == null) {
+ return SchemaType.NONE;
+ }
+ try {
+ return SchemaType.valueOf(v4Type.name());
+ } catch (IllegalArgumentException e) {
+ // v4 has primitive types with no v5 counterpart (e.g. date/time
variants); they are
+ // handled by the generic "primitive" path, so NONE is a safe
mapping.
+ return SchemaType.NONE;
+ }
+ }
+
+ @Override
+ public Object nativeObject() {
+ return convert(v4Record.getNativeObject());
+ }
+
+ @Override
+ public List<Field> fields() {
+ return fields;
+ }
+
+ @Override
+ public Object field(String fieldName) {
+ return convert(v4Record.getField(fieldName));
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
index 1a9444911f4..b349c941c7f 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.v5;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
+import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.v5.Message;
import org.apache.pulsar.client.api.v5.MessageId;
@@ -84,8 +85,15 @@ final class MessageV5<T> implements Message<T> {
}
@Override
+ @SuppressWarnings("unchecked")
public T value() {
- return v4Message.getValue();
+ Object value = v4Message.getValue();
+ // The v4 AUTO_CONSUME / generic schemas decode into a v4
GenericRecord; surface it through
+ // the v5 API as a v5 GenericRecord. All other values pass through
unchanged.
+ if (value instanceof GenericRecord) {
+ return (T) GenericRecordV5.convert(value);
+ }
+ return (T) value;
}
@Override
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
index 0439f67b285..fe8ac87a70d 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
import org.apache.pulsar.client.api.v5.PulsarClientException;
import org.apache.pulsar.client.api.v5.auth.Authentication;
import org.apache.pulsar.client.api.v5.internal.PulsarClientProvider;
+import org.apache.pulsar.client.api.v5.schema.GenericRecord;
import org.apache.pulsar.client.api.v5.schema.Schema;
/**
@@ -136,6 +137,16 @@ public final class PulsarClientProviderV5 implements
PulsarClientProvider {
return
SchemaAdapter.toV5(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES(v4Base));
}
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public Schema<GenericRecord> autoConsumeSchema() {
+ // Wrap the genuine v4 AUTO_CONSUME schema so the v5 consumer passes
it straight through to
+ // the v4 layer (which special-cases AutoConsumeSchema for runtime
schema fetching). The
+ // decoded v4 GenericRecord is converted to a v5 GenericRecord in
MessageV5#value().
+ return SchemaAdapter.toV5((org.apache.pulsar.client.api.Schema)
+ org.apache.pulsar.client.api.Schema.AUTO_CONSUME());
+ }
+
// --- Checkpoint ---
@Override
diff --git
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
index db4cc2b6ec6..75a844c58db 100644
---
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
+++
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
@@ -20,10 +20,17 @@ package org.apache.pulsar.client.impl.v5;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.v5.schema.GenericRecord;
+import org.apache.pulsar.client.api.v5.schema.KeyValue;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.client.api.v5.schema.SchemaInfo;
import org.apache.pulsar.client.api.v5.schema.SchemaType;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.testng.annotations.Test;
/**
@@ -65,4 +72,81 @@ public class SchemaFactoryTest {
byte[] payload = "hello".getBytes(StandardCharsets.UTF_8);
assertEquals(wrapped.encode(payload), payload);
}
+
+ @Test
+ public void testAutoConsumeUnwrapsToV4AutoConsumeSchema() {
+ // The v5 auto-consume schema must pass the genuine v4
AutoConsumeSchema down to the v4
+ // consumer, which special-cases it for runtime schema fetching.
+ Schema<GenericRecord> autoConsume = Schema.autoConsume();
+ assertNotNull(autoConsume);
+ // The v4 AutoConsumeSchema reports no SchemaInfo until it fetches the
topic's schema at
+ // runtime, so we only assert the unwrapped v4 schema is the genuine
AutoConsumeSchema.
+ assertTrue((Object) SchemaAdapter.toV4(autoConsume) instanceof
AutoConsumeSchema);
+ }
+
+ @Test
+ public void testGenericRecordConversion() {
+ // A nested v4 GenericRecord field is surfaced as a v5 GenericRecord.
+ org.apache.pulsar.client.api.schema.GenericRecord v4Nested = v4Record(
+ org.apache.pulsar.common.schema.SchemaType.AVRO,
+ List.of(new Field("inner", 0)), Map.of("inner", "x"), null);
+ org.apache.pulsar.client.api.schema.GenericRecord v4Outer = v4Record(
+ org.apache.pulsar.common.schema.SchemaType.AVRO,
+ List.of(new Field("a", 0), new Field("nested", 1)),
+ Map.of("a", "hello", "nested", v4Nested), null);
+
+ GenericRecord outer = (GenericRecord) GenericRecordV5.convert(v4Outer);
+ assertEquals(outer.schemaType(), SchemaType.AVRO);
+ assertEquals(outer.fields().size(), 2);
+ assertEquals(outer.field("a"), "hello");
+ assertTrue(outer.field("nested") instanceof GenericRecord);
+ assertEquals(((GenericRecord) outer.field("nested")).field("inner"),
"x");
+ }
+
+ @Test
+ public void testKeyValueConversion() {
+ // A KEY_VALUE wrapper's native object is surfaced as a v5 KeyValue.
+ org.apache.pulsar.common.schema.KeyValue<String, String> v4Kv =
+ new org.apache.pulsar.common.schema.KeyValue<>("k", "v");
+ org.apache.pulsar.client.api.schema.GenericRecord v4Record = v4Record(
+ org.apache.pulsar.common.schema.SchemaType.KEY_VALUE,
List.of(), Map.of(), v4Kv);
+
+ GenericRecord record = (GenericRecord)
GenericRecordV5.convert(v4Record);
+ assertEquals(record.schemaType(), SchemaType.KEY_VALUE);
+ assertTrue(record.nativeObject() instanceof KeyValue);
+ KeyValue<?, ?> kv = (KeyValue<?, ?>) record.nativeObject();
+ assertEquals(kv.key(), "k");
+ assertEquals(kv.value(), "v");
+ }
+
+ private static org.apache.pulsar.client.api.schema.GenericRecord v4Record(
+ org.apache.pulsar.common.schema.SchemaType type, List<Field>
fields,
+ Map<String, Object> values, Object nativeObject) {
+ return new org.apache.pulsar.client.api.schema.GenericRecord() {
+ @Override
+ public byte[] getSchemaVersion() {
+ return null;
+ }
+
+ @Override
+ public List<Field> getFields() {
+ return fields;
+ }
+
+ @Override
+ public Object getField(String fieldName) {
+ return values.get(fieldName);
+ }
+
+ @Override
+ public org.apache.pulsar.common.schema.SchemaType getSchemaType() {
+ return type;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return nativeObject;
+ }
+ };
+ }
}