BewareMyPower opened a new issue, #21884:
URL: https://github.com/apache/pulsar/issues/21884

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   KSN (the private version of the Kafka protocol handler) based on Pulsar 
3.1.0.
   
   ### Minimal reproduce step
   
   **Background for KSN**
   
   There is a partitioned topic `public/__kafka/__consumer_offsets` with 50 
partitions. In KSN, there are 50 producers and 50 readers created on this 
topic. The producer is created like:
   
   ```java
           this.producerBuilder = 
client.newProducer(schema).enableBatching(true)
                   .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
                   .maxPendingMessages(maxPendingMessages)
                   .sendTimeout(offsetConfig.offsetCommitTimeoutMs(), 
TimeUnit.MILLISECONDS)
                   .blockIfQueueFull(true);
   ```
   
   The send timeout is 5 seconds.
   
   ```java
               final var future = producers.computeIfAbsent(partition, __ ->
                   
producerBuilder.clone().topic(getPartition(partition)).createAsync());
   ```
   
   I used the Kafka CLI to consume a topic, which committed offsets to 
`__consumer_offsets-partition-38` with an interval of 5 seconds. Each time an 
offset is committed, a message will be sent via a Pulsar producer.
   
   ```java
           
producer.newMessage().keyBytes(key).value(value).eventTime(timestamp).sendAsync().whenComplete(
                   (msgId, e) -> {
                       if (e == null) {
                           future.complete(msgId);
                       } else {
                           if (e instanceof 
PulsarClientException.AlreadyClosedException) {
                               // The producer is already closed, we don't need 
to close it again.
                               producers.remove(partition);
                           } else if (e instanceof 
PulsarClientException.TimeoutException te) {
                               log.warn("Timeout when sending: {}, seq id: {}", 
te.getMessage(), te.getSequenceId());
                           }
                           future.completeExceptionally(e);
                       }
                   });
   ```
   
   After some time, the commit offset request failed with timeout error:
   
   ```bash
   $ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
my-topic --group my-group
   [2024-01-11 23:18:00,649] WARN [Consumer clientId=console-consumer, 
groupId=my-group] Offset commit failed on partition my-topic-0 at offset 0: The 
request timed out. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
   ```
   
   From the broker side logs, we can see:
   
   ```
   2024-01-11T23:10:23,628+0800 [pulsar-ph-kafka-224-11] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, 
protocolType=Optional[consumer], state=Stable, 
members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9,
 clientId=console-consumer, clientHost=/127.0.0.1:63244, 
sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, 
supportedProtocols=java.util.stream.ReferencePipeline$Head@1031ff46}}} (38) 
key: 18 bytes
   2024-01-11T23:10:23,630+0800 [pulsar-timer-118-1] INFO  
org.apache.pulsar.client.impl.ProducerImpl - 
[public/__kafka/__consumer_offsets-partition-38] [standalone-2804-81] Message 
send timed out. Failing 1 messages
   2024-01-11T23:10:23,631+0800 [pulsar-timer-118-1] WARN  
io.streamnative.pulsar.handlers.kop.coordinator.CompactedPartitionedTopic - 
Timeout when sending: The producer standalone-2804-81 can not send message to 
the topic public/__kafka/__consumer_offsets-partition-38 within given timeout, 
seq id: -1
   ```
   
   ### What did you expect to see?
   
   Before the `sendAsync` method is called, the timestamp is 23:10:23,628. 
However, when the message timed out, the timestamp is 23:10:23,631. The 
interval is only 3 milliseconds.
   
   ### What did you see instead?
   
   The time duration between `XYZ storeOffsetMessageAsync` and `Timeout when 
sending` should be close to 5 seconds.
   
   ### Anything else?
   
   There is nothing strange before the timeout happened.
   
   ```
   2024-01-11T23:09:53,619+0800 [pulsar-client-io-87-3] INFO  
org.apache.pulsar.client.impl.ProducerImpl - 
[public/__kafka/__consumer_offsets-partition-38] [standalone-2804-81] Created 
producer on cnx [id: 0x4d70618b, L:/127.0.0.1:63232 - R:/127.0.0.1:6650]
   2024-01-11T23:09:53,623+0800 [pulsar-client-io-87-3] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes
   2024-01-11T23:09:58,622+0800 [pulsar-ph-kafka-224-11] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, 
protocolType=Optional[consumer], state=Stable, 
members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9,
 clientId=console-consumer, clientHost=/127.0.0.1:63244, 
sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, 
supportedProtocols=java.util.stream.ReferencePipeline$Head@7346deaf}}} (38) 
key: 18 bytes
   2024-01-11T23:09:58,630+0800 [pulsar-client-io-87-3] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes
   2024-01-11T23:10:03,620+0800 [pulsar-ph-kafka-224-11] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, 
protocolType=Optional[consumer], state=Stable, 
members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9,
 clientId=console-consumer, clientHost=/127.0.0.1:63244, 
sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, 
supportedProtocols=java.util.stream.ReferencePipeline$Head@624415e4}}} (38) 
key: 18 bytes
   2024-01-11T23:10:03,625+0800 [pulsar-client-io-87-3] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes
   2024-01-11T23:10:08,624+0800 [pulsar-ph-kafka-224-11] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, 
protocolType=Optional[consumer], state=Stable, 
members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9,
 clientId=console-consumer, clientHost=/127.0.0.1:63244, 
sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, 
supportedProtocols=java.util.stream.ReferencePipeline$Head@b35f1bd}}} (38) key: 
18 bytes
   2024-01-11T23:10:08,630+0800 [pulsar-client-io-87-3] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes
   2024-01-11T23:10:13,621+0800 [pulsar-ph-kafka-224-11] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, 
protocolType=Optional[consumer], state=Stable, 
members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9,
 clientId=console-consumer, clientHost=/127.0.0.1:63244, 
sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, 
supportedProtocols=java.util.stream.ReferencePipeline$Head@33f29b11}}} (38) 
key: 18 bytes
   2024-01-11T23:10:13,626+0800 [pulsar-client-io-87-3] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes
   2024-01-11T23:10:17,592+0800 [SyncThread-9-1] INFO  
org.apache.bookkeeper.bookie.Journal - garbage collected journal 18cf7e07713.txn
   2024-01-11T23:10:18,624+0800 [pulsar-ph-kafka-224-11] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync GroupMetadata{groupId=my-group, generation=22, 
protocolType=Optional[consumer], state=Stable, 
members={console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9=MemberMetadata{memberId=console-consumer-b18a2e15-6cc8-470a-bf43-d35808ecc6f9,
 clientId=console-consumer, clientHost=/127.0.0.1:63244, 
sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, 
supportedProtocols=java.util.stream.ReferencePipeline$Head@76848866}}} (38) 
key: 18 bytes
   2024-01-11T23:10:18,631+0800 [pulsar-client-io-87-3] INFO  
io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager - 
XYZ storeOffsetMessageAsync successfully 38 key: 18 bytes
   ```
   
   The logs above are all INFO logs before the timeout error. As you can see, 
the message send interval is 1 message per 5 seconds and other messages 
succeeded as well.
   
   I printed the sequence id of the `TimeoutException` and found it's -1.
   
   ```
   024-01-11T23:10:23,631+0800 [pulsar-timer-118-1] WARN  
io.streamnative.pulsar.handlers.kop.coordinator.CompactedPartitionedTopic - 
Timeout when sending: The producer standalone-2804-81 can not send message to 
the topic public/__kafka/__consumer_offsets-partition-38 within given timeout, 
seq id: -1
   ```
   
   It means it failed here: 
https://github.com/apache/pulsar/blob/176bdeacd309e8c1e49358987a1946abd30ba34a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2032
   
   If the message is already created as an `OpSendMsg`, the sequence id should 
be positive.
   
   
[full-logs.tar.gz](https://github.com/apache/pulsar/files/13905199/full-logs.tar.gz)
   
   I suspected there is something wrong with the refactored logic of: 
https://github.com/apache/pulsar/pull/14185
   
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to