Gleiphir2769 commented on code in PR #1071:
URL: https://github.com/apache/pulsar-client-go/pull/1071#discussion_r1278691606


##########
pulsar/producer_partition.go:
##########
@@ -1121,78 +958,301 @@ func (p *partitionProducer) SendAsync(ctx 
context.Context, msg *ProducerMessage,
        p.internalSendAsync(ctx, msg, callback, false)
 }
 
-func (p *partitionProducer) internalSendAsync(ctx context.Context, msg 
*ProducerMessage,
-       callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
+func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
        if msg == nil {
-               p.log.Error("Message is nil")
-               runCallback(callback, nil, msg, newError(InvalidMessage, 
"Message is nil"))
-               return
+               return newError(InvalidMessage, "Message is nil")
        }
 
        if msg.Value != nil && msg.Payload != nil {
-               p.log.Error("Can not set Value and Payload both")
-               runCallback(callback, nil, msg, newError(InvalidMessage, "Can 
not set Value and Payload both"))
-               return
+               return newError(InvalidMessage, "Can not set Value and Payload 
both")
        }
 
-       // Register transaction operation to transaction and the transaction 
coordinator.
-       var newCallback func(MessageID, *ProducerMessage, error)
-       var txn *transaction
-       if msg.Transaction != nil {
-               transactionImpl := (msg.Transaction).(*transaction)
-               txn = transactionImpl
-               if transactionImpl.state != TxnOpen {
-                       p.log.WithField("state", 
transactionImpl.state).Error("Failed to send message" +
-                               " by a non-open transaction.")
-                       runCallback(callback, nil, msg, newError(InvalidStatus, 
"Failed to send message by a non-open transaction."))
-                       return
+       if p.options.DisableMultiSchema {
+               if msg.Schema != nil && p.options.Schema != nil &&
+                       msg.Schema.GetSchemaInfo().hash() != 
p.options.Schema.GetSchemaInfo().hash() {
+                       p.log.Errorf("The producer %s of the topic %s is 
disabled the `MultiSchema`", p.producerName, p.topic)
+                       return fmt.Errorf("msg schema can not match with 
producer schema")
                }
+       }
 
-               if err := transactionImpl.registerProducerTopic(p.topic); err 
!= nil {
-                       runCallback(callback, nil, msg, err)
-                       return
+       return nil
+}
+
+func (p *partitionProducer) updateSchema(sr *sendRequest) error {
+       var schema Schema
+       var schemaVersion []byte
+       var err error
+
+       if sr.msg.Schema != nil {
+               schema = sr.msg.Schema
+       } else if p.options.Schema != nil {
+               schema = p.options.Schema
+       }
+
+       if schema == nil {
+               return nil
+       }
+
+       schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
+       if schemaVersion == nil {
+               schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
+               if err != nil {
+                       return fmt.Errorf("get schema version fail, err: %w", 
err)
                }
-               if err := transactionImpl.registerSendOrAckOp(); err != nil {
-                       runCallback(callback, nil, msg, err)
-                       return
+               p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
+       }
+
+       sr.schema = schema
+       sr.schemaVersion = schemaVersion
+       return nil
+}
+
+func (p *partitionProducer) updateUncompressPayload(sr *sendRequest) error {
+       // read payload from message
+       sr.uncompressedPayload = sr.msg.Payload
+
+       if sr.msg.Value != nil {
+               if sr.schema == nil {
+                       p.log.Errorf("Schema encode message failed %s", 
sr.msg.Value)
+                       return newError(SchemaFailure, "set schema value 
without setting schema")
                }
-               newCallback = func(id MessageID, producerMessage 
*ProducerMessage, err error) {
-                       runCallback(callback, id, producerMessage, err)
-                       transactionImpl.endSendOrAckOp(err)
+
+               // payload and schema are mutually exclusive
+               // try to get payload from schema value only if payload is not 
set
+               schemaPayload, err := sr.schema.Encode(sr.msg.Value)
+               if err != nil {
+                       p.log.WithError(err).Errorf("Schema encode message 
failed %s", sr.msg.Value)
+                       return newError(SchemaFailure, err.Error())
                }
+
+               sr.uncompressedPayload = schemaPayload
+       }
+
+       sr.uncompressedSize = int64(len(sr.uncompressedPayload))
+       return nil
+}
+
+func (p *partitionProducer) updateMetaData(sr *sendRequest) {
+       deliverAt := sr.msg.DeliverAt
+       if sr.msg.DeliverAfter.Nanoseconds() > 0 {
+               deliverAt = time.Now().Add(sr.msg.DeliverAfter)
+       }
+
+       sr.mm = p.genMetadata(sr.msg, int(sr.uncompressedSize), deliverAt)
+
+       // set default ReplicationClusters when DisableReplication
+       if sr.msg.DisableReplication {
+               sr.msg.ReplicationClusters = []string{"__local__"}
+       }
+
+       sr.sendAsBatch = !p.options.DisableBatching &&
+               sr.msg.ReplicationClusters == nil &&
+               deliverAt.UnixNano() < 0
+
+       if !sr.sendAsBatch {
+               // update sequence id for metadata, make the size of 
msgMetadata more accurate
+               // batch sending will update sequence ID in the BatchBuilder
+               p.updateMetadataSeqID(sr.mm, sr.msg)
+       }
+
+       sr.deliverAt = deliverAt
+}
+
+func (p *partitionProducer) updateChunkInfo(sr *sendRequest) error {
+       checkSize := sr.uncompressedSize
+       if !sr.sendAsBatch {
+               sr.compressedPayload = p.compressionProvider.Compress(nil, 
sr.uncompressedPayload)
+               sr.compressedSize = len(sr.compressedPayload)
+
+               // set the compress type in msgMetaData
+               compressionType := pb.CompressionType(p.options.CompressionType)
+               if compressionType != pb.CompressionType_NONE {
+                       sr.mm.Compression = &compressionType
+               }
+
+               checkSize = int64(sr.compressedSize)
+       }
+
+       sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize()))

Review Comment:
   > MaxMessageSize is cached in the conn when a conn is ready, you can check 
the code of connection and connectionPool, this is OK.
   
   You're right. This point is good to me.
   
   > Actually, p.getOrCreateSchema() will trigger a block RPC call. What make 
SendAsync not a real asyn func is the fixed length PendingQueue
   
   But the size of `PendingQueue` is decided by user. And if the size of 
`dataChan` is set as `MaxPendingMessages`, it will not be the async limit.
   
   > I don't think memLimit and fixed length queue is neccessary in a language 
with GC like JAVA and Go
   
   This is not only related to memory, but also related to broker. Here is the 
interface description of Java client.
   >Set the max size of the queue holding the messages pending to receive an 
acknowledgment from the broker
   
   I think we should find a way to solve the `p.getOrCreateSchema()` problem. 
What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to