sijie commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema URL: https://github.com/apache/pulsar/pull/5165#discussion_r325781020
########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java ########## @@ -430,6 +447,99 @@ public void sendAsync(Message<T> message, SendCallback callback) { } } + private boolean fillMessageSchema(MessageMetadata.Builder msgMetadataBuilder, + Schema msgSchema, + SendCallback callback) { + if (msgSchema == schema) { + schemaVersion.ifPresent(v -> msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(v))); + return true; + } + if (!isMultiSchemaEnabled(true)) { + callback.sendComplete(new PulsarClientException.InvalidMessageException( + "Multiple schema disabled")); + return false; + } + byte[] schemaVersion; + try { + schemaVersion = schemaCache.computeIfAbsent( + SchemaHash.of(msgSchema), (hash) -> { + SchemaInfo schemaInfo = Optional.ofNullable(msgSchema) + .map(Schema::getSchemaInfo) + .filter(si -> si.getType().getValue() > 0) + .orElse(Schema.BYTES.getSchemaInfo()); + try { + return getOrCreateSchemaAsync(schemaInfo).get(); Review comment: @yittg I think this should be improved. We should avoid calling *sync* methods. A better implementation here is: - if a new schema is needed, flush out all the existing pending requests to the broker. because all the existing pending requests use old schemas that are already registered in broker. - introduce a state for the client (e.g. REGISTERING_SCHEMA). Turn the client into REGISTERING_SCHEMA state. All the requests should be added in the pending queue and wait until the schema is registered in the broker. - after a schema is successfully registered in the broker, flush all the pending requests to the broker. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services