geniusjoe opened a new issue, #1448:
URL: https://github.com/apache/pulsar-client-go/issues/1448

   #### Expected behavior
   Producer should not stuck in Send() function
   
   #### Actual behavior
   Producer stuck in Send() function and cannot recover
   
   #### Steps to reproduce
   This test case is similar to the existed `TestChunkBlockIfQueueFull` and I 
remove send() ctx timeout
   ```Go
   func TestChunkBlockIfQueueFull(t *testing.T) {
        sLogger := slog.New(slog.NewTextHandler(os.Stdout, 
&slog.HandlerOptions{Level: slog.LevelDebug}))
        client, err := NewClient(ClientOptions{
                URL:    lookupURL,
                Logger: log.NewLoggerWithSlog(sLogger),
        })
        if err != nil {
                t.Fatal(err)
        }
   
        topic := newTopicName()
   
        producer, err := client.CreateProducer(ProducerOptions{
                Name:                "test",
                Topic:               topic,
                EnableChunking:      true,
                DisableBatching:     true,
                MaxPendingMessages:  10,
                ChunkMaxMessageSize: 10,
        })
        assert.NoError(t, err)
        assert.NotNil(t, producer)
        defer producer.Close()
   
        ctx := context.Background()
        _, err = producer.Send(ctx, &ProducerMessage{
                Payload: createTestMessagePayload(101),
        })
        assert.Error(t, err)
   }
   ```
   
   Output:
   ```
   === RUN   TestChunkBlockIfQueueFull
   time=2025-12-11T12:08:49.514+08:00 level=DEBUG msg="Getting pooled 
connection" logicalAddr=pulsar://localhost:6650 
physicalAddr=pulsar://localhost:6650
   time=2025-12-11T12:08:49.514+08:00 level=DEBUG msg="Wait until connection is 
ready state=Initializing" remote_addr=pulsar://localhost:6650
   time=2025-12-11T12:08:49.514+08:00 level=INFO msg="Connecting to broker" 
remote_addr=pulsar://localhost:6650
   time=2025-12-11T12:08:49.515+08:00 level=INFO msg="TCP connection 
established" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.516+08:00 level=DEBUG msg="Write data: 49" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.517+08:00 level=DEBUG msg="Got MaxMessageSize from 
handshake response:5242880" remote_addr=pulsar://localhost:6650 
local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.517+08:00 level=INFO msg="Connection is ready" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.517+08:00 level=DEBUG msg="Connection run starting 
with request capacity=10 queued=0" remote_addr=pulsar://localhost:6650 
local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.517+08:00 level=DEBUG msg="Write data: 63" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.518+08:00 level=DEBUG msg="Got command! 
type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 
request_id:1 response:Success} with payload size: 0 maxMsgSize: 5242880" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.518+08:00 level=DEBUG msg="Received command: 
type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0 
request_id:1 response:Success} -- payload: <nil>" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Got 
topic{my-topic-514059306} partitioned metadata response: 
&{Response:type:PARTITIONED_METADATA_RESPONSE 
partitionMetadataResponse:{partitions:0 request_id:1 response:Success} 
Cnx:0xc00017e300}" serviceURL=pulsar://localhost:6650 
serviceURL=pulsar://localhost:6650
   time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Getting pooled 
connection" logicalAddr=pulsar://localhost:6650 
physicalAddr=pulsar://localhost:6650
   time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Found connection in pool 
key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 
physical_addr=pulsar://localhost:6650"
   time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Write data: 67" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Got command! 
type:LOOKUP_RESPONSE 
lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" 
response:Connect request_id:2 authoritative:true 
proxy_through_service_url:true} with payload size: 0 maxMsgSize: 5242880" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Received command: 
type:LOOKUP_RESPONSE 
lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" 
response:Connect request_id:2 authoritative:true 
proxy_through_service_url:true} -- payload: <nil>" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Got 
topic{persistent://public/default/my-topic-514059306} lookup response: 
&{Response:type:LOOKUP_RESPONSE 
lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\" 
response:Connect request_id:2 authoritative:true 
proxy_through_service_url:true} Cnx:0xc00017e300}" 
serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
   time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Successfully looked up 
topic{persistent://public/default/my-topic-514059306} on broker. 
pulsar://localhost:6650 /  - Use proxy: true" 
serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
   time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Lookup result: 
&{pulsar://localhost:6650 pulsar://localhost:6650}" 
topic=persistent://public/default/my-topic-514059306
   time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="The partition producer 
schema is nil" topic=persistent://public/default/my-topic-514059306
   time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Getting pooled 
connection" logicalAddr=pulsar://localhost:6650 
physicalAddr=pulsar://localhost:6650
   time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Found connection in pool 
key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650 
physical_addr=pulsar://localhost:6650"
   time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Write data: 78" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.544+08:00 level=DEBUG msg="Got command! 
type:PRODUCER_SUCCESS producer_success:{request_id:3 producer_name:\"test\" 
last_sequence_id:-1 schema_version:\"\" producer_ready:true} with payload size: 
0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650 
local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.544+08:00 level=DEBUG msg="Received command: 
type:PRODUCER_SUCCESS producer_success:{request_id:3 producer_name:\"test\" 
last_sequence_id:-1 schema_version:\"\" producer_ready:true} -- payload: <nil>" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:08:49.544+08:00 level=INFO msg="Connected producer" 
topic=persistent://public/default/my-topic-514059306 cnx="127.0.0.1:49974 -> 
127.0.0.1:6650" epoch=0
   time=2025-12-11T12:08:49.544+08:00 level=INFO msg="Created producer" 
topic=persistent://public/default/my-topic-514059306 producer_name=test 
producerID=1 cnx="127.0.0.1:49974 -> 127.0.0.1:6650"
   time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Got command! type:PING 
ping:{} with payload size: 0 maxMsgSize: 5242880" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Received command: 
type:PING ping:{} -- payload: <nil>" remote_addr=pulsar://localhost:6650 
local_addr=127.0.0.1:49974
   time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Responding to PING 
request" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Write data: 13" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Sending PING" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Write data: 13" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Got command! type:PONG 
pong:{} with payload size: 0 maxMsgSize: 5242880" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Received command: 
type:PONG pong:{} -- payload: <nil>" remote_addr=pulsar://localhost:6650 
local_addr=127.0.0.1:49974
   time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Received PONG response" 
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
   ```
   
   #### System configuration
   **Broker version**: apache-pulsar-4.0.8
   **Client version**: master 
https://github.com/apache/pulsar-client-go/commit/bdfa6a08d6b22ef367bda73b133424c45d710e8f
 
   
   The current strategy when stuck in sending chunk messages is: 
   ```Go
   producer.Send() -> 
   p.internalSendAsync() -> 
   p.updateChunkInfo(sr): assemble sr.totalChunk num = msg's payload / 
maxChunkSize -> 
   p.reserveResources() -> 
   p.reserveSemaphore(sr): try to acquire p.publishSemaphore num = 
sr.totalChunk num
   ``` 
   When we produce a message with `payload > maxChunkSize * 
MaxPendingMessages`, this single message will occupy the entire partition 
producer's `p.publishSemaphore` and cannot be released, causing the entire 
partition producer sending progress block forever.
   
   Since a message cannot be sent successfully if its payload size exceeds 
`maxChunkSize * MaxPendingMessages`, I believe we should perform payload size 
validation in `p.validateMsg()` or `p.updateChunkInfo()` before trying acquire 
`p.publishSemaphore`. If the  payload exceeds the limit, we should directly 
generate an error similar to `ErrMessageTooLarge` and return it to the Send() 
caller function.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to