congbobo184 commented on a change in pull request #10604:
URL: https://github.com/apache/pulsar/pull/10604#discussion_r637487537
##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
##########
@@ -781,4 +786,181 @@ public void testNullKey() throws Exception {
assertEquals("foo", message.getValue());
}
+ public void testConsumeMultipleSchemaMessages() throws Exception {
+ final String namespace = "test-namespace-" + randomName(16);
+ String ns = PUBLIC_TENANT + "/" + namespace;
+ admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+ admin.namespaces().setSchemaCompatibilityStrategy(ns,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ final String autoProducerTopic = getTopicName(ns,
"auto_produce_topic");
+ Producer<byte[]> autoProducer =
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+ .topic(autoProducerTopic)
+ .create();
+
+ AtomicInteger totalMsgCnt = new AtomicInteger(0);
+ generateDataByDifferentSchema(ns, "bytes_schema", Schema.BYTES, "bytes
value".getBytes(),
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "string_schema", Schema.STRING,
"string value",
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "bool_schema", Schema.BOOL, true,
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "json_one_schema",
Schema.JSON(Schemas.PersonOne.class),
+ new Schemas.PersonOne(1), autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "json_three_schema",
Schema.JSON(Schemas.PersonThree.class),
+ new Schemas.PersonThree(3, "ran"), autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "json_four_schema",
Schema.JSON(Schemas.PersonFour.class),
+ new Schemas.PersonFour(4, "tang", 18), autoProducer,
totalMsgCnt);
+ generateDataByDifferentSchema(ns, "avro_one_schema",
Schema.AVRO(Schemas.PersonOne.class),
+ new Schemas.PersonOne(10), autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "k_one_v_three_schema_separate",
+ Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
+ Schema.JSON(Schemas.PersonThree.class),
KeyValueEncodingType.SEPARATED),
+ new KeyValue<>(new Schemas.PersonOne(1), new
Schemas.PersonThree(3, "kv-separate")),
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "k_one_v_four_schema_inline",
+ Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
+ Schema.JSON(Schemas.PersonFour.class),
KeyValueEncodingType.INLINE),
+ new KeyValue<>(new Schemas.PersonOne(10), new
Schemas.PersonFour(30, "kv-inline", 20)),
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "k_int_v_three_schema_separate",
+ Schema.KeyValue(Schema.INT32,
Schema.JSON(Schemas.PersonThree.class), KeyValueEncodingType.SEPARATED),
+ new KeyValue<>(100, new Schemas.PersonThree(40,
"kv-separate")),
+ autoProducer, totalMsgCnt);
+
+ Consumer<GenericRecord> autoConsumer =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(autoProducerTopic)
+ .subscriptionName("test")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Message<GenericRecord> message;
+ for (int i = 0; i < totalMsgCnt.get(); i++) {
+ message = autoConsumer.receive(5, TimeUnit.SECONDS);
+ if (message == null) {
+ Assert.fail("Failed to receive multiple schema message.");
+ }
+ log.info("auto consumer get native object class: {}, value: {}",
+ message.getValue().getNativeObject().getClass(),
message.getValue().getNativeObject());
+ checkSchemaForAutoSchema(i, message);
+ }
+ }
+
+ private String getTopicName(String ns, String baseTopic) {
+ return ns + "/" + baseTopic;
+ }
+
+ private void generateDataByDifferentSchema(String ns,
+ String baseTopic,
+ Schema schema,
+ Object data,
+ Producer<?> autoProducer,
+ AtomicInteger totalMsgCnt)
throws PulsarClientException {
+ String topic = getTopicName(ns, baseTopic);
+ Producer<Object> producer = pulsarClient.newProducer(schema)
+ .topic(topic)
+ .create();
+ producer.newMessage().value(data).property("baseTopic",
baseTopic).send();
+ totalMsgCnt.incrementAndGet();
+
+ Consumer<GenericRecord> consumer =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName("test")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Message<GenericRecord> message = consumer.receive(5, TimeUnit.SECONDS);
+ if (message == null) {
+ Assert.fail("Failed to receive message for topic " + topic);
+ }
+ if (!message.getReaderSchema().isPresent()) {
+ Assert.fail("Failed to get reader schema for topic " + topic);
+ }
+ message.getValue();
+
+ Schema<?> readerSchema = message.getReaderSchema().get();
+ if (readerSchema instanceof KeyValueSchema
+ && ((KeyValueSchema<?, ?>) readerSchema)
+
.getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
+ autoProducer.newMessage(
+
Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())).keyBytes(message.getKeyBytes())
+ .value(message.getData())
+ .properties(message.getProperties())
+ .send();
+ } else {
+
autoProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+ .properties(message.getProperties())
+ .value(message.getData())
+ .send();
+ }
+ producer.close();
+ consumer.close();
+ }
+
+ private void checkSchemaForAutoSchema(int index, Message<GenericRecord>
message) {
+ if (!message.getReaderSchema().isPresent()) {
+ Assert.fail("Failed to get reader schema for auto consume multiple
schema topic.");
+ }
+ Object nativeObject = message.getValue().getNativeObject();
+ String baseTopic = message.getProperty("baseTopic");
+ JsonNode jsonNode;
+ KeyValue<?, ?> kv;
+ switch (baseTopic) {
+ case "bytes_schema":
+ Assert.assertEquals(new String((byte[]) nativeObject), "bytes
value");
+ break;
+ case "string_schema":
+ Assert.assertEquals((String) nativeObject, "string value");
+ break;
+ case "bool_schema":
+ Assert.assertEquals(nativeObject, Boolean.TRUE);
+ break;
+ case "json_one_schema":
+ jsonNode = (JsonNode) nativeObject;
+ Assert.assertEquals(jsonNode.get("id").intValue(), 1);
+ break;
+ case "json_three_schema":
+ jsonNode = (JsonNode) nativeObject;
+ Assert.assertEquals(jsonNode.get("id").intValue(), 3);
+ Assert.assertEquals(jsonNode.get("name").textValue(), "ran");
+ break;
+ case "json_four_schema":
+ jsonNode = (JsonNode) nativeObject;
+ Assert.assertEquals(jsonNode.get("id").intValue(), 4);
+ Assert.assertEquals(jsonNode.get("name").textValue(), "tang");
+ Assert.assertEquals(jsonNode.get("age").intValue(), 18);
+ break;
+ case "avro_one_schema":
+ org.apache.avro.generic.GenericRecord genericRecord =
Review comment:
does this type is `PersonOne` not `GenericAvroRecord` is right?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
##########
@@ -43,73 +45,97 @@
@Slf4j
public class AutoConsumeSchema implements Schema<GenericRecord> {
- private Schema<?> schema;
+ private final ConcurrentMap<SchemaVersion, Schema<?>> schemaMap =
initSchemaMap();
private String topicName;
private String componentName;
private SchemaInfoProvider schemaInfoProvider;
+ private ConcurrentMap<SchemaVersion, Schema<?>> initSchemaMap() {
+ ConcurrentMap<SchemaVersion, Schema<?>> schemaMap =
Maps.newConcurrentMap();
+ // The Schema.BYTES will not be uploaded to the broker and store in
the schema storage,
+ // if the schema version in the message metadata is empty byte[], it
means its schema is Schema.BYTES.
+ schemaMap.put(BytesSchemaVersion.of(new byte[0]), Schema.BYTES);
+ return schemaMap;
+ }
+
+ public void setSchema(SchemaVersion schemaVersion, Schema<?> schema) {
+ schemaMap.put(schemaVersion, schema);
+ }
+
public void setSchema(Schema<?> schema) {
- this.schema = schema;
+ schemaMap.put(SchemaVersion.Latest, schema);
}
- private void ensureSchemaInitialized() {
- checkState(null != schema, "Schema is not initialized before used");
+ private void ensureSchemaInitialized(SchemaVersion schemaVersion) {
+ checkState(schemaMap.containsKey(schemaVersion),
+ "Schema version " + schemaVersion + " is not initialized
before used");
}
@Override
public void validate(byte[] message) {
- ensureSchemaInitialized();
+ ensureSchemaInitialized(SchemaVersion.Latest);
- schema.validate(message);
+ schemaMap.get(SchemaVersion.Latest).validate(message);
}
@Override
public byte[] encode(GenericRecord message) {
- ensureSchemaInitialized();
-
throw new UnsupportedOperationException("AutoConsumeSchema is not
intended to be used for encoding");
}
@Override
public boolean supportSchemaVersioning() {
- return schema == null || schema.supportSchemaVersioning();
+ return true;
}
public Schema<?> atSchemaVersion(byte[] schemaVersion) {
- fetchSchemaIfNeeded();
- ensureSchemaInitialized();
- if (schema.supportSchemaVersioning() && schema instanceof
AbstractSchema) {
- return ((AbstractSchema) schema).atSchemaVersion(schemaVersion);
+ SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+ fetchSchemaIfNeeded(sv);
+ ensureSchemaInitialized(sv);
+ Schema<?> topicVersionedSchema = schemaMap.get(sv);
+ if (topicVersionedSchema.supportSchemaVersioning() &&
topicVersionedSchema instanceof AbstractSchema) {
+ return ((AbstractSchema<?>)
topicVersionedSchema).atSchemaVersion(schemaVersion);
} else {
- return schema;
+ return topicVersionedSchema;
}
}
@Override
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
- fetchSchemaIfNeeded();
- ensureSchemaInitialized();
- return adapt(schema.decode(bytes, schemaVersion), schemaVersion);
+ SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+ fetchSchemaIfNeeded(sv);
+ ensureSchemaInitialized(sv);
+ return adapt(schemaMap.get(sv).decode(bytes, schemaVersion),
schemaVersion);
Review comment:
may don't need decode(bytes, schemaVersion), only use decode(bytes)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]