geniusjoe opened a new issue, #1448:
URL: https://github.com/apache/pulsar-client-go/issues/1448
#### Expected behavior
Producer should not stuck in Send() function
#### Actual behavior
Producer stuck in Send() function and cannot recover
#### Steps to reproduce
This test case is similar to the existed `TestChunkBlockIfQueueFull` and I
remove send() ctx timeout
```Go
func TestChunkBlockIfQueueFull(t *testing.T) {
sLogger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: slog.LevelDebug}))
client, err := NewClient(ClientOptions{
URL: lookupURL,
Logger: log.NewLoggerWithSlog(sLogger),
})
if err != nil {
t.Fatal(err)
}
topic := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Name: "test",
Topic: topic,
EnableChunking: true,
DisableBatching: true,
MaxPendingMessages: 10,
ChunkMaxMessageSize: 10,
})
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
ctx := context.Background()
_, err = producer.Send(ctx, &ProducerMessage{
Payload: createTestMessagePayload(101),
})
assert.Error(t, err)
}
```
Output:
```
=== RUN TestChunkBlockIfQueueFull
time=2025-12-11T12:08:49.514+08:00 level=DEBUG msg="Getting pooled
connection" logicalAddr=pulsar://localhost:6650
physicalAddr=pulsar://localhost:6650
time=2025-12-11T12:08:49.514+08:00 level=DEBUG msg="Wait until connection is
ready state=Initializing" remote_addr=pulsar://localhost:6650
time=2025-12-11T12:08:49.514+08:00 level=INFO msg="Connecting to broker"
remote_addr=pulsar://localhost:6650
time=2025-12-11T12:08:49.515+08:00 level=INFO msg="TCP connection
established" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.516+08:00 level=DEBUG msg="Write data: 49"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.517+08:00 level=DEBUG msg="Got MaxMessageSize from
handshake response:5242880" remote_addr=pulsar://localhost:6650
local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.517+08:00 level=INFO msg="Connection is ready"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.517+08:00 level=DEBUG msg="Connection run starting
with request capacity=10 queued=0" remote_addr=pulsar://localhost:6650
local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.517+08:00 level=DEBUG msg="Write data: 63"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.518+08:00 level=DEBUG msg="Got command!
type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0
request_id:1 response:Success} with payload size: 0 maxMsgSize: 5242880"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.518+08:00 level=DEBUG msg="Received command:
type:PARTITIONED_METADATA_RESPONSE partitionMetadataResponse:{partitions:0
request_id:1 response:Success} -- payload: <nil>"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Got
topic{my-topic-514059306} partitioned metadata response:
&{Response:type:PARTITIONED_METADATA_RESPONSE
partitionMetadataResponse:{partitions:0 request_id:1 response:Success}
Cnx:0xc00017e300}" serviceURL=pulsar://localhost:6650
serviceURL=pulsar://localhost:6650
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Getting pooled
connection" logicalAddr=pulsar://localhost:6650
physicalAddr=pulsar://localhost:6650
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Found connection in pool
key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650
physical_addr=pulsar://localhost:6650"
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Write data: 67"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Got command!
type:LOOKUP_RESPONSE
lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\"
response:Connect request_id:2 authoritative:true
proxy_through_service_url:true} with payload size: 0 maxMsgSize: 5242880"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Received command:
type:LOOKUP_RESPONSE
lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\"
response:Connect request_id:2 authoritative:true
proxy_through_service_url:true} -- payload: <nil>"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.519+08:00 level=DEBUG msg="Got
topic{persistent://public/default/my-topic-514059306} lookup response:
&{Response:type:LOOKUP_RESPONSE
lookupTopicResponse:{brokerServiceUrl:\"pulsar://localhost:6650\"
response:Connect request_id:2 authoritative:true
proxy_through_service_url:true} Cnx:0xc00017e300}"
serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Successfully looked up
topic{persistent://public/default/my-topic-514059306} on broker.
pulsar://localhost:6650 / - Use proxy: true"
serviceURL=pulsar://localhost:6650 serviceURL=pulsar://localhost:6650
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Lookup result:
&{pulsar://localhost:6650 pulsar://localhost:6650}"
topic=persistent://public/default/my-topic-514059306
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="The partition producer
schema is nil" topic=persistent://public/default/my-topic-514059306
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Getting pooled
connection" logicalAddr=pulsar://localhost:6650
physicalAddr=pulsar://localhost:6650
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Found connection in pool
key=localhost:6650-localhost:6650-0 logical_addr=pulsar://localhost:6650
physical_addr=pulsar://localhost:6650"
time=2025-12-11T12:08:49.520+08:00 level=DEBUG msg="Write data: 78"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.544+08:00 level=DEBUG msg="Got command!
type:PRODUCER_SUCCESS producer_success:{request_id:3 producer_name:\"test\"
last_sequence_id:-1 schema_version:\"\" producer_ready:true} with payload size:
0 maxMsgSize: 5242880" remote_addr=pulsar://localhost:6650
local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.544+08:00 level=DEBUG msg="Received command:
type:PRODUCER_SUCCESS producer_success:{request_id:3 producer_name:\"test\"
last_sequence_id:-1 schema_version:\"\" producer_ready:true} -- payload: <nil>"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:08:49.544+08:00 level=INFO msg="Connected producer"
topic=persistent://public/default/my-topic-514059306 cnx="127.0.0.1:49974 ->
127.0.0.1:6650" epoch=0
time=2025-12-11T12:08:49.544+08:00 level=INFO msg="Created producer"
topic=persistent://public/default/my-topic-514059306 producer_name=test
producerID=1 cnx="127.0.0.1:49974 -> 127.0.0.1:6650"
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Got command! type:PING
ping:{} with payload size: 0 maxMsgSize: 5242880"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Received command:
type:PING ping:{} -- payload: <nil>" remote_addr=pulsar://localhost:6650
local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Responding to PING
request" remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Write data: 13"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Sending PING"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Write data: 13"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Got command! type:PONG
pong:{} with payload size: 0 maxMsgSize: 5242880"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Received command:
type:PONG pong:{} -- payload: <nil>" remote_addr=pulsar://localhost:6650
local_addr=127.0.0.1:49974
time=2025-12-11T12:09:19.542+08:00 level=DEBUG msg="Received PONG response"
remote_addr=pulsar://localhost:6650 local_addr=127.0.0.1:49974
```
#### System configuration
**Broker version**: apache-pulsar-4.0.8
**Client version**: master
https://github.com/apache/pulsar-client-go/commit/bdfa6a08d6b22ef367bda73b133424c45d710e8f
The current strategy when stuck in sending chunk messages is:
```Go
producer.Send() ->
p.internalSendAsync() ->
p.updateChunkInfo(sr): assemble sr.totalChunk num = msg's payload /
maxChunkSize ->
p.reserveResources() ->
p.reserveSemaphore(sr): try to acquire p.publishSemaphore num =
sr.totalChunk num
```
When we produce a message with `payload > maxChunkSize *
MaxPendingMessages`, this single message will occupy the entire partition
producer's `p.publishSemaphore` and cannot be released, causing the entire
partition producer sending progress block forever.
Since a message cannot be sent successfully if its payload size exceeds
`maxChunkSize * MaxPendingMessages`, I believe we should perform payload size
validation in `p.validateMsg()` or `p.updateChunkInfo()` before trying acquire
`p.publishSemaphore`. If the payload exceeds the limit, we should directly
generate an error similar to `ErrMessageTooLarge` and return it to the Send()
caller function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]