This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 8d49acb5b44 [fix][broker] Add schema version in rest produce api
(#25004)
8d49acb5b44 is described below
commit 8d49acb5b4441528d7527caffa55b9b2f39c3bb9
Author: Oneby <[email protected]>
AuthorDate: Wed Nov 26 13:24:04 2025 +0800
[fix][broker] Add schema version in rest produce api (#25004)
---
.../org/apache/pulsar/broker/rest/TopicsBase.java | 7 +--
.../org/apache/pulsar/broker/admin/TopicsTest.java | 63 ++++++++++++++++++++++
2 files changed, 67 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index 47067c55005..425e715a1e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -196,7 +196,7 @@ public class TopicsBase extends PersistentTopicsBase {
try {
String producerName = (null == request.getProducerName() ||
request.getProducerName().isEmpty())
? defaultProducerName : request.getProducerName();
- List<Message> messages = buildMessage(request, schema,
producerName, topicName);
+ List<Message> messages = buildMessage(request, schema,
producerName, topicName, schemaVersion);
List<CompletableFuture<Position>> publishResults = new
ArrayList<>();
List<ProducerAck> produceMessageResults = new ArrayList<>();
for (int index = 0; index < messages.size(); index++) {
@@ -237,7 +237,7 @@ public class TopicsBase extends PersistentTopicsBase {
try {
String producerName = (null == request.getProducerName() ||
request.getProducerName().isEmpty())
? defaultProducerName : request.getProducerName();
- List<Message> messages = buildMessage(request, schema,
producerName, topicName);
+ List<Message> messages = buildMessage(request, schema,
producerName, topicName, schemaVersion);
List<CompletableFuture<Position>> publishResults = new
ArrayList<>();
List<ProducerAck> produceMessageResults = new ArrayList<>();
// Try to publish messages to all partitions this broker owns in
round robin mode.
@@ -627,7 +627,7 @@ public class TopicsBase extends PersistentTopicsBase {
// Build pulsar message from REST request.
private List<Message> buildMessage(ProducerMessages producerMessages,
Schema schema,
- String producerName, TopicName
topicName) {
+ String producerName, TopicName
topicName, SchemaVersion schemaVersion) {
List<ProducerMessage> messages;
List<Message> pulsarMessages = new ArrayList<>();
@@ -637,6 +637,7 @@ public class TopicsBase extends PersistentTopicsBase {
messageMetadata.setProducerName(producerName);
messageMetadata.setPublishTime(System.currentTimeMillis());
messageMetadata.setSequenceId(message.getSequenceId());
+ messageMetadata.setSchemaVersion(schemaVersion.bytes());
if (null != message.getReplicationClusters()) {
messageMetadata.addAllReplicateTos(message.getReplicationClusters());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 72426110224..bcbd133ce5c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -86,6 +86,7 @@ import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAcks;
import org.apache.pulsar.websocket.data.ProducerMessage;
@@ -868,4 +869,66 @@ public class TopicsTest extends
MockedPulsarServiceBaseTest {
+ "kv.encoding.type=SEPARATED, key.schema.type=STRING}) to
topic persistent:"
+ "//my-tenant/my-namespace/my-topic"));
}
+
+ @Test
+ public void testProduceWithAutoConsumeSchema() throws Exception {
+ String topicName = "persistent://" + testTenant + "/" + testNamespace
+ "/" + testTopicName;
+ admin.topics().createNonPartitionedTopic(topicName);
+ GenericSchema jsonSchema =
GenericJsonSchema.of(JSONSchema.of(SchemaDefinition.builder()
+ .withPojo(PC.class).build()).getSchemaInfo());
+ // use producer to create topic schema
+ Producer producer =
pulsarClient.newProducer(jsonSchema).topic(topicName).create();
+ producer.close();
+
+ PC pc = new PC("dell", "alienware", 2021, GPU.AMD,
+ new Seller("WA", "main street", 98004));
+ PC anotherPc = new PC("asus", "rog", 2020, GPU.NVIDIA,
+ new Seller("CA", "back street", 90232));
+
+ @Cleanup
+ Consumer<?> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topicName)
+ .subscriptionName("auto-schema-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ SchemaInfoWithVersion schemaInfoWithVersion =
admin.schemas().getSchemaInfoWithVersion(topicName);
+ AsyncResponse asyncResponse = mock(AsyncResponse.class);
+ ProducerMessages producerMessages = new ProducerMessages();
+ producerMessages.setSchemaVersion(schemaInfoWithVersion.getVersion());
+ String message = "["
+ + "{\"key\":\"my-key\",\"payload\":\""
+ +
ObjectMapperFactory.getMapper().writer().writeValueAsString(pc).replace("\"",
"\\\"")
+ + "\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+ + "{\"key\":\"my-key\",\"payload\":\""
+ +
ObjectMapperFactory.getMapper().writer().writeValueAsString(anotherPc).replace("\"",
"\\\"")
+ + "\",\"eventTime\":1603045262772,\"sequenceId\":2}]";
+ producerMessages.setMessages(createMessages(message));
+ topics.produceOnPersistentTopic(asyncResponse, testTenant,
testNamespace, testTopicName, false,
+ producerMessages);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(asyncResponse,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.OK.getStatusCode());
+
+ List<PC> expected = Arrays.asList(pc, anotherPc);
+ for (int i = 0; i < 2; i++) {
+ Message<?> msg = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertTrue(msg.getValue() instanceof GenericJsonRecord);
+ GenericJsonRecord genericJsonRecord = (GenericJsonRecord)
msg.getValue();
+ Assert.assertEquals(genericJsonRecord.getField("brand"),
expected.get(i).getBrand());
+ Assert.assertEquals(genericJsonRecord.getField("model"),
expected.get(i).getModel());
+ Number year = (Number) genericJsonRecord.getField("year");
+ Assert.assertEquals(year.intValue(), expected.get(i).getYear());
+ Assert.assertEquals(genericJsonRecord.getField("gpu"),
expected.get(i).getGpu().name());
+ Object seller = genericJsonRecord.getField("seller");
+ Assert.assertTrue(seller instanceof GenericJsonRecord);
+ GenericJsonRecord sellerGenericJsonRecord = (GenericJsonRecord)
seller;
+ Assert.assertEquals(sellerGenericJsonRecord.getField("state"),
expected.get(i).getSeller().getState());
+ Assert.assertEquals(sellerGenericJsonRecord.getField("street"),
expected.get(i).getSeller().getStreet());
+ Number zipCode = (Number)
sellerGenericJsonRecord.getField("zipCode");
+ Assert.assertEquals(zipCode.longValue(),
expected.get(i).getSeller().getZipCode());
+ }
+ }
+
}