This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 21bd87e Improve error log for frame size too big and maxMessageSize
(#459)
21bd87e is described below
commit 21bd87eefc7394321c4ff0fc8d1bfe6de7139d23
Author: Ming <[email protected]>
AuthorDate: Fri Mar 5 04:39:32 2021 -0500
Improve error log for frame size too big and maxMessageSize (#459)
### Motivation
This PR is to improve error log for `frame size too big` on the consumer.
We have seen a number of frame size too big error and I would like to trace
what the exact maxMessageSize is set to that is not default. Debug log was not
able to be enabled on the production system.
### Modifications
Log improvement mostly
---
pulsar/internal/connection.go | 2 +-
pulsar/internal/connection_reader.go | 11 +++++++----
pulsar/producer_partition.go | 2 +-
3 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 6619fe6..433c8a8 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -302,7 +302,7 @@ func (c *connection) doHandshake() bool {
cmd.Error.GetMessage())
return false
}
- if cmd.Connected.MaxMessageSize != nil {
+ if cmd.Connected.MaxMessageSize != nil && *cmd.Connected.MaxMessageSize
> 0 {
c.log.Debug("Got MaxMessageSize from handshake response:",
*cmd.Connected.MaxMessageSize)
c.maxMessageSize = *cmd.Connected.MaxMessageSize
} else {
diff --git a/pulsar/internal/connection_reader.go
b/pulsar/internal/connection_reader.go
index 8035324..62a6526 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -19,6 +19,7 @@ package internal
import (
"bufio"
+ "fmt"
"io"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -54,7 +55,7 @@ func (r *connectionReader) readFromConnection() {
if headersAndPayload != nil {
payloadLen = headersAndPayload.ReadableBytes()
}
- r.cnx.log.Debug("Got command! ", cmd, " with payload size: ",
payloadLen)
+ r.cnx.log.Debug("Got command! ", cmd, " with payload size: ",
payloadLen, " maxMsgSize: ", r.cnx.maxMessageSize)
r.cnx.receivedCommand(cmd, headersAndPayload)
}
}
@@ -73,10 +74,12 @@ func (r *connectionReader) readSingleCommand() (cmd
*pb.BaseCommand, headersAndP
// We have enough to read frame size
frameSize := r.buffer.ReadUint32()
- if r.cnx.maxMessageSize != 0 && int32(frameSize) >
(r.cnx.maxMessageSize+MessageFramePadding) {
- r.cnx.log.Warnf("Received too big frame size. size=%d",
frameSize)
+ maxFrameSize := r.cnx.maxMessageSize + MessageFramePadding
+ if r.cnx.maxMessageSize != 0 && int32(frameSize) > maxFrameSize {
+ frameSizeError := fmt.Errorf("received too big frame size=%d
maxFrameSize=%d", frameSize, maxFrameSize)
+ r.cnx.log.Error(frameSizeError)
r.cnx.TriggerClose()
- return nil, nil, errors.New("Frame size too big")
+ return nil, nil, frameSizeError
}
// Next, we read the rest of the frame
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 02ee7cf..34def20 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -351,7 +351,7 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
p.log.WithError(errMessageTooLarge).
WithField("size", len(payload)).
WithField("properties", msg.Properties).
- Error()
+ Errorf("MaxMessageSize %d",
int(p.cnx.GetMaxMessageSize()))
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}