gaoran10 commented on a change in pull request #10604:
URL: https://github.com/apache/pulsar/pull/10604#discussion_r637500307
##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
##########
@@ -781,4 +786,181 @@ public void testNullKey() throws Exception {
assertEquals("foo", message.getValue());
}
+ public void testConsumeMultipleSchemaMessages() throws Exception {
+ final String namespace = "test-namespace-" + randomName(16);
+ String ns = PUBLIC_TENANT + "/" + namespace;
+ admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+ admin.namespaces().setSchemaCompatibilityStrategy(ns,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ final String autoProducerTopic = getTopicName(ns,
"auto_produce_topic");
+ Producer<byte[]> autoProducer =
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+ .topic(autoProducerTopic)
+ .create();
+
+ AtomicInteger totalMsgCnt = new AtomicInteger(0);
+ generateDataByDifferentSchema(ns, "bytes_schema", Schema.BYTES, "bytes
value".getBytes(),
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "string_schema", Schema.STRING,
"string value",
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "bool_schema", Schema.BOOL, true,
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "json_one_schema",
Schema.JSON(Schemas.PersonOne.class),
+ new Schemas.PersonOne(1), autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "json_three_schema",
Schema.JSON(Schemas.PersonThree.class),
+ new Schemas.PersonThree(3, "ran"), autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "json_four_schema",
Schema.JSON(Schemas.PersonFour.class),
+ new Schemas.PersonFour(4, "tang", 18), autoProducer,
totalMsgCnt);
+ generateDataByDifferentSchema(ns, "avro_one_schema",
Schema.AVRO(Schemas.PersonOne.class),
+ new Schemas.PersonOne(10), autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "k_one_v_three_schema_separate",
+ Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
+ Schema.JSON(Schemas.PersonThree.class),
KeyValueEncodingType.SEPARATED),
+ new KeyValue<>(new Schemas.PersonOne(1), new
Schemas.PersonThree(3, "kv-separate")),
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "k_one_v_four_schema_inline",
+ Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
+ Schema.JSON(Schemas.PersonFour.class),
KeyValueEncodingType.INLINE),
+ new KeyValue<>(new Schemas.PersonOne(10), new
Schemas.PersonFour(30, "kv-inline", 20)),
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "k_int_v_three_schema_separate",
+ Schema.KeyValue(Schema.INT32,
Schema.JSON(Schemas.PersonThree.class), KeyValueEncodingType.SEPARATED),
+ new KeyValue<>(100, new Schemas.PersonThree(40,
"kv-separate")),
+ autoProducer, totalMsgCnt);
+
+ Consumer<GenericRecord> autoConsumer =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(autoProducerTopic)
+ .subscriptionName("test")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Message<GenericRecord> message;
+ for (int i = 0; i < totalMsgCnt.get(); i++) {
+ message = autoConsumer.receive(5, TimeUnit.SECONDS);
+ if (message == null) {
+ Assert.fail("Failed to receive multiple schema message.");
+ }
+ log.info("auto consumer get native object class: {}, value: {}",
+ message.getValue().getNativeObject().getClass(),
message.getValue().getNativeObject());
+ checkSchemaForAutoSchema(i, message);
+ }
+ }
+
+ private String getTopicName(String ns, String baseTopic) {
+ return ns + "/" + baseTopic;
+ }
+
+ private void generateDataByDifferentSchema(String ns,
+ String baseTopic,
+ Schema schema,
+ Object data,
+ Producer<?> autoProducer,
+ AtomicInteger totalMsgCnt)
throws PulsarClientException {
+ String topic = getTopicName(ns, baseTopic);
+ Producer<Object> producer = pulsarClient.newProducer(schema)
+ .topic(topic)
+ .create();
+ producer.newMessage().value(data).property("baseTopic",
baseTopic).send();
+ totalMsgCnt.incrementAndGet();
+
+ Consumer<GenericRecord> consumer =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName("test")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Message<GenericRecord> message = consumer.receive(5, TimeUnit.SECONDS);
+ if (message == null) {
+ Assert.fail("Failed to receive message for topic " + topic);
+ }
+ if (!message.getReaderSchema().isPresent()) {
+ Assert.fail("Failed to get reader schema for topic " + topic);
+ }
+ message.getValue();
+
+ Schema<?> readerSchema = message.getReaderSchema().get();
+ if (readerSchema instanceof KeyValueSchema
+ && ((KeyValueSchema<?, ?>) readerSchema)
+
.getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
+ autoProducer.newMessage(
+
Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())).keyBytes(message.getKeyBytes())
+ .value(message.getData())
+ .properties(message.getProperties())
+ .send();
+ } else {
+
autoProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+ .properties(message.getProperties())
+ .value(message.getData())
+ .send();
+ }
+ producer.close();
+ consumer.close();
+ }
+
+ private void checkSchemaForAutoSchema(int index, Message<GenericRecord>
message) {
+ if (!message.getReaderSchema().isPresent()) {
+ Assert.fail("Failed to get reader schema for auto consume multiple
schema topic.");
+ }
+ Object nativeObject = message.getValue().getNativeObject();
+ String baseTopic = message.getProperty("baseTopic");
+ JsonNode jsonNode;
+ KeyValue<?, ?> kv;
+ switch (baseTopic) {
+ case "bytes_schema":
+ Assert.assertEquals(new String((byte[]) nativeObject), "bytes
value");
+ break;
+ case "string_schema":
+ Assert.assertEquals((String) nativeObject, "string value");
+ break;
+ case "bool_schema":
+ Assert.assertEquals(nativeObject, Boolean.TRUE);
+ break;
+ case "json_one_schema":
+ jsonNode = (JsonNode) nativeObject;
+ Assert.assertEquals(jsonNode.get("id").intValue(), 1);
+ break;
+ case "json_three_schema":
+ jsonNode = (JsonNode) nativeObject;
+ Assert.assertEquals(jsonNode.get("id").intValue(), 3);
+ Assert.assertEquals(jsonNode.get("name").textValue(), "ran");
+ break;
+ case "json_four_schema":
+ jsonNode = (JsonNode) nativeObject;
+ Assert.assertEquals(jsonNode.get("id").intValue(), 4);
+ Assert.assertEquals(jsonNode.get("name").textValue(), "tang");
+ Assert.assertEquals(jsonNode.get("age").intValue(), 18);
+ break;
+ case "avro_one_schema":
+ org.apache.avro.generic.GenericRecord genericRecord =
Review comment:
It seems that if using the `Schema.AUTO_CONSUME()`, the reader doesn't
know the Pojo `PersonOne` so the decoding result is
`org.apache.avro.generic.GenericRecord`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]