sijie commented on a change in pull request #5165: [PIP-43] Support producer to 
send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r325781020
 
 

 ##########
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
 ##########
 @@ -430,6 +447,99 @@ public void sendAsync(Message<T> message, SendCallback 
callback) {
         }
     }
 
+    private boolean fillMessageSchema(MessageMetadata.Builder 
msgMetadataBuilder,
+                                      Schema msgSchema,
+                                      SendCallback callback) {
+        if (msgSchema == schema) {
+            schemaVersion.ifPresent(v -> 
msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(v)));
+            return true;
+        }
+        if (!isMultiSchemaEnabled(true)) {
+            callback.sendComplete(new 
PulsarClientException.InvalidMessageException(
+                    "Multiple schema disabled"));
+            return false;
+        }
+        byte[] schemaVersion;
+        try {
+            schemaVersion = schemaCache.computeIfAbsent(
+                    SchemaHash.of(msgSchema), (hash) -> {
+                        SchemaInfo schemaInfo = Optional.ofNullable(msgSchema)
+                                                        
.map(Schema::getSchemaInfo)
+                                                        .filter(si -> 
si.getType().getValue() > 0)
+                                                        
.orElse(Schema.BYTES.getSchemaInfo());
+                        try {
+                            return getOrCreateSchemaAsync(schemaInfo).get();
 
 Review comment:
   @yittg I think this should be improved. We should avoid calling *sync* 
methods. 
   
   A better implementation here is:
   
   - if a new schema is needed, flush out all the existing pending requests to 
the broker. because all the existing pending requests use old schemas that are 
already registered in broker.
   - introduce a state for the client (e.g. REGISTERING_SCHEMA). Turn the 
client into REGISTERING_SCHEMA state. All the requests should be added in the 
pending queue and wait until the schema is registered in the broker.
   - after a schema is successfully registered in the broker, flush all the 
pending requests to the broker.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to