BewareMyPower opened a new issue, #21884: URL: https://github.com/apache/pulsar/issues/21884
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version KSN (the private version of the Kafka protocol handler) based on Pulsar 3.1.0. ### Minimal reproduce step **Background for KSN** There is a partitioned topic `public/__kafka/__consumer_offsets` with 50 partitions. In KSN, there are 50 producers and 50 readers created on this topic. The producer is created like: ```java this.producerBuilder = client.newProducer(schema).enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) .maxPendingMessages(maxPendingMessages) .sendTimeout(offsetConfig.offsetCommitTimeoutMs(), TimeUnit.MILLISECONDS) .blockIfQueueFull(true); ``` The send timeout is 5 seconds. ```java final var future = producers.computeIfAbsent(partition, __ -> producerBuilder.clone().topic(getPartition(partition)).createAsync()); ``` I used the Kafka CLI to consume a topic, which committed offsets to `__consumer_offsets-partition-38` with an interval of 5 seconds. Each time an offset is committed, a message will be sent via a Pulsar producer. ```java producer.newMessage().keyBytes(key).value(value).eventTime(timestamp).sendAsync().whenComplete( (msgId, e) -> { if (e == null) { future.complete(msgId); } else { if (e instanceof PulsarClientException.AlreadyClosedException) { // The producer is already closed, we don't need to close it again. producers.remove(partition); } else if (e instanceof PulsarClientException.TimeoutException te) { log.warn("Timeout when sending: {}, seq id: {}", te.getMessage(), te.getSequenceId()); } future.completeExceptionally(e); } }); ``` After some time, the commit offset request failed with timeout error: ```bash $ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --group my-group [2024-01-11 23:18:00,649] WARN [Consumer clientId=console-consumer, groupId=my-group] Offset commit failed on partition my-topic-0 at offset 0: The request timed out. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) ``` From the broker side logs, we can see: ``` 2024-01-11T23:10:23,628+0800 [pulsar-ph-kafka-224-11] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@1031ff46}}} (38) key: 18 bytes 2024-01-11T23:10:23,630+0800 [pulsar-timer-118-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [public/__kafka/__consumer_offsets-partition-38] [standalone-2804-81] Message send timed out. Failing 1 messages 2024-01-11T23:10:23,631+0800 [pulsar-timer-118-1] WARN io.streamnative.pulsar.handlers.kop.coordinator.CompactedPartitionedTopic - Timeout when sending: The producer standalone-2804-81 can not send message to the topic public/__kafka/__consumer_offsets-partition-38 within given timeout, seq id: -1 ``` ### What did you expect to see? Before the `sendAsync` method is called, the timestamp is 23:10:23,628. However, when the message timed out, the timestamp is 23:10:23,631. The interval is only 3 milliseconds. ### What did you see instead? The time duration between `XYZ storeOffsetMessageAsync` and `Timeout when sending` should be close to 5 seconds. ### Anything else? There is nothing strange before the timeout happened. ``` 2024-01-11T23:09:53,619+0800 [pulsar-client-io-87-3] INFO org.apache.pulsar.client.impl.ProducerImpl - [public/__kafka/__consumer_offsets-partition-38] [standalone-2804-81] Created producer on cnx [id: 0x4d70618b, L:/127.0.0.1:63232 - R:/127.0.0.1:6650] 2024-01-11T23:09:53,623+0800 [pulsar-client-io-87-3] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes 2024-01-11T23:09:58,622+0800 [pulsar-ph-kafka-224-11] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@7346deaf}}} (38) key: 18 bytes 2024-01-11T23:09:58,630+0800 [pulsar-client-io-87-3] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes 2024-01-11T23:10:03,620+0800 [pulsar-ph-kafka-224-11] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@624415e4}}} (38) key: 18 bytes 2024-01-11T23:10:03,625+0800 [pulsar-client-io-87-3] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes 2024-01-11T23:10:08,624+0800 [pulsar-ph-kafka-224-11] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@b35f1bd}}} (38) key: 18 bytes 2024-01-11T23:10:08,630+0800 [pulsar-client-io-87-3] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes 2024-01-11T23:10:13,621+0800 [pulsar-ph-kafka-224-11] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@33f29b11}}} (38) key: 18 bytes 2024-01-11T23:10:13,626+0800 [pulsar-client-io-87-3] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes 2024-01-11T23:10:17,592+0800 [SyncThread-9-1] INFO org.apache.bookkeeper.bookie.Journal - garbage collected journal 18cf7e07713.txn 2024-01-11T23:10:18,624+0800 [pulsar-ph-kafka-224-11] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, protocolType=Optional[consumer], state=Stable, members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9, clientId=console-consumer, clientHost=/127.0.0.1:63244, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=java.util.stream.ReferencePipeline$Head@76848866}}} (38) key: 18 bytes 2024-01-11T23:10:18,631+0800 [pulsar-client-io-87-3] INFO io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes ``` The logs above are all INFO logs before the timeout error. As you can see, the message send interval is 1 message per 5 seconds and other messages succeeded as well. I printed the sequence id of the `TimeoutException` and found it's -1. ``` 024-01-11T23:10:23,631+0800 [pulsar-timer-118-1] WARN io.streamnative.pulsar.handlers.kop.coordinator.CompactedPartitionedTopic - Timeout when sending: The producer standalone-2804-81 can not send message to the topic public/__kafka/__consumer_offsets-partition-38 within given timeout, seq id: -1 ``` It means it failed here: https://github.com/apache/pulsar/blob/176bdeacd309e8c1e49358987a1946abd30ba34a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2032 If the message is already created as an `OpSendMsg`, the sequence id should be positive. [full-logs.tar.gz](https://github.com/apache/pulsar/files/13905199/full-logs.tar.gz) I suspected there is something wrong with the refactored logic of: https://github.com/apache/pulsar/pull/14185 ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
