This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 21bd87e  Improve error log for frame size too big and maxMessageSize 
(#459)
21bd87e is described below

commit 21bd87eefc7394321c4ff0fc8d1bfe6de7139d23
Author: Ming <[email protected]>
AuthorDate: Fri Mar 5 04:39:32 2021 -0500

    Improve error log for frame size too big and maxMessageSize (#459)
    
    ### Motivation
    This PR is to improve error log for `frame size too big` on the consumer. 
We have seen a number of frame size too big error and I would like to trace 
what the exact maxMessageSize is set to that is not default. Debug log was not 
able to be enabled on the production system.
    
    
    ### Modifications
    
    Log improvement mostly
---
 pulsar/internal/connection.go        |  2 +-
 pulsar/internal/connection_reader.go | 11 +++++++----
 pulsar/producer_partition.go         |  2 +-
 3 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 6619fe6..433c8a8 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -302,7 +302,7 @@ func (c *connection) doHandshake() bool {
                        cmd.Error.GetMessage())
                return false
        }
-       if cmd.Connected.MaxMessageSize != nil {
+       if cmd.Connected.MaxMessageSize != nil && *cmd.Connected.MaxMessageSize 
> 0 {
                c.log.Debug("Got MaxMessageSize from handshake response:", 
*cmd.Connected.MaxMessageSize)
                c.maxMessageSize = *cmd.Connected.MaxMessageSize
        } else {
diff --git a/pulsar/internal/connection_reader.go 
b/pulsar/internal/connection_reader.go
index 8035324..62a6526 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -19,6 +19,7 @@ package internal
 
 import (
        "bufio"
+       "fmt"
        "io"
 
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -54,7 +55,7 @@ func (r *connectionReader) readFromConnection() {
                if headersAndPayload != nil {
                        payloadLen = headersAndPayload.ReadableBytes()
                }
-               r.cnx.log.Debug("Got command! ", cmd, " with payload size: ", 
payloadLen)
+               r.cnx.log.Debug("Got command! ", cmd, " with payload size: ", 
payloadLen, " maxMsgSize: ", r.cnx.maxMessageSize)
                r.cnx.receivedCommand(cmd, headersAndPayload)
        }
 }
@@ -73,10 +74,12 @@ func (r *connectionReader) readSingleCommand() (cmd 
*pb.BaseCommand, headersAndP
 
        // We have enough to read frame size
        frameSize := r.buffer.ReadUint32()
-       if r.cnx.maxMessageSize != 0 && int32(frameSize) > 
(r.cnx.maxMessageSize+MessageFramePadding) {
-               r.cnx.log.Warnf("Received too big frame size. size=%d", 
frameSize)
+       maxFrameSize := r.cnx.maxMessageSize + MessageFramePadding
+       if r.cnx.maxMessageSize != 0 && int32(frameSize) > maxFrameSize {
+               frameSizeError := fmt.Errorf("received too big frame size=%d 
maxFrameSize=%d", frameSize, maxFrameSize)
+               r.cnx.log.Error(frameSizeError)
                r.cnx.TriggerClose()
-               return nil, nil, errors.New("Frame size too big")
+               return nil, nil, frameSizeError
        }
 
        // Next, we read the rest of the frame
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 02ee7cf..34def20 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -351,7 +351,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                p.log.WithError(errMessageTooLarge).
                        WithField("size", len(payload)).
                        WithField("properties", msg.Properties).
-                       Error()
+                       Errorf("MaxMessageSize %d", 
int(p.cnx.GetMaxMessageSize()))
                p.metrics.PublishErrorsMsgTooLarge.Inc()
                return
        }

Reply via email to