This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new af56e603 refactor: factor out validateMsg (#1117)
af56e603 is described below
commit af56e6031d8021ddcff188f94c4d744eba3b3211
Author: tison <[email protected]>
AuthorDate: Tue Oct 24 21:27:02 2023 +0800
refactor: factor out validateMsg (#1117)
Signed-off-by: tison <[email protected]>
---
pulsar/producer_partition.go | 40 +++++++++++++++++++++++++---------------
1 file changed, 25 insertions(+), 15 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 82123f98..62c180ca 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -495,14 +495,6 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
return
}
- if p.options.DisableMultiSchema {
- if msg.Schema != nil && p.options.Schema != nil &&
- msg.Schema.GetSchemaInfo().hash() !=
p.options.Schema.GetSchemaInfo().hash() {
- runCallback(request.callback, nil, request.msg,
fmt.Errorf("msg schema can not match with producer schema"))
- p.log.WithError(err).Errorf("The producer %s of the
topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
- return
- }
- }
var schema Schema
var schemaVersion []byte
if msg.Schema != nil {
@@ -1121,17 +1113,35 @@ func (p *partitionProducer) SendAsync(ctx
context.Context, msg *ProducerMessage,
p.internalSendAsync(ctx, msg, callback, false)
}
-func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
- callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
+func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
if msg == nil {
- p.log.Error("Message is nil")
- runCallback(callback, nil, msg, newError(InvalidMessage,
"Message is nil"))
- return
+ return newError(InvalidMessage, "Message is nil")
}
if msg.Value != nil && msg.Payload != nil {
- p.log.Error("Can not set Value and Payload both")
- runCallback(callback, nil, msg, newError(InvalidMessage, "Can
not set Value and Payload both"))
+ return newError(InvalidMessage, "Can not set Value and Payload
both")
+ }
+
+ if p.options.DisableMultiSchema {
+ if msg.Schema != nil && p.options.Schema != nil &&
+ msg.Schema.GetSchemaInfo().hash() !=
p.options.Schema.GetSchemaInfo().hash() {
+ p.log.Errorf("The producer %s of the topic %s is
disabled the `MultiSchema`", p.producerName, p.topic)
+ return fmt.Errorf("msg schema can not match with
producer schema")
+ }
+ }
+
+ return nil
+}
+
+func (p *partitionProducer) internalSendAsync(
+ ctx context.Context,
+ msg *ProducerMessage,
+ callback func(MessageID, *ProducerMessage, error),
+ flushImmediately bool,
+) {
+ if err := p.validateMsg(msg); err != nil {
+ p.log.Error(err)
+ runCallback(callback, nil, msg, err)
return
}