andrekramer1 opened a new issue #14242:
URL: https://github.com/apache/pulsar/issues/14242


   With acknowledgmentAtBatchIndexLevelEnabled = false (default) and the Web 
Socket consumer builder set to builder.enableBatchIndexAcknowledgment(false) 
(again the default) and all messages acknowledged by the web socket client, 
   we see some messages not properly acknowledged and deleted from the 
subscription. If the subscribing web socket client disconnects and then 
reconnects to Web Socket tier it will see those messages again and if it 
reconnect again then again, etc.
   
   The bug can be side stepped by setting both configs above to true so that 
"batch index level acks" are fully enabled. 
   If not enabled in the Web Socket tier's consumer (regardless of broker 
setting) then the following happens in Web Socket servlet:
   
   A message acknowledgement is de-serialized and the message id object gets 
it's own bitset. So even if an ack is for the same
   batch as others it will have a separate bitmap and so batches with more than 
1 message are  never acknowledged to the broker 
   (as the bitsets will never become all unset - just one unset out of expected 
size of batch). 
   
   We attempted a fix that makes the incoming (to Web Socket tier) message ids 
share bitsets if in the same batch (after message ack 
   deserialization). This seems to work and is similar to what happens in the 
batch index case, where the bitsets are duplicated and shared at the batch 
level. The suggested fix is outlined below.
   
   Also, for the true/true case (batch index acks enabled) we did spot a data 
race in that the shared bitmap has a bit set which could be missed because the 
flush has just read the bits from and then removed the same bitset from the 
concurrent hash map. We would advice the readwritelock that is in the 
PersistentAcknowledgmentsGroupingTracker already (but used only if 
isAckReceiptEnabled(consumer.getClientCnx()). Occasional dropped acks could 
otherwise result.
   
   
   **To Reproduce**
   
   We had 2 producers sending in messages (say 10) that ended up in batches of 
usually 2 messages. And one Web Socket subscribing consumer connected to the 
Web Proxy tier. It would see all messages but after disconnecting and 
re-connecting it would see some messages again. These could not be consumed via 
the web socket tier but can be consumed with a normal/binary Pulsar client (say 
bin/pulsar-client) and we noted that the unconsumed messages were in batches of 
more than one. Ledger(s) could also not be delete while the messages are 
unconsumed so even if the application could handle the replays we quickly run 
out of "bookie" storage space.
   
   **Expected behavior**
   All messages that are acked to be marked as consumed in subscription with 
some occasional replays to be allowed (at least once semantics). 
   
   **Desktop (please complete the following information):**
    - All deployed in Kubernetes. Private build of 2.8.0.
   
   **Additional context**
   We observed the problem in 2.8.0 but it should be similar for later versions 
as the code has not changed in the Web Socket or parts of the Pulsar client 
used inside the WebSocket servlet.
   
   -------------------------------------------------------------------------
   Suggested fix: 
   
   ConsumerHandler.java 
   
   current:
           MessageId msgId = 
MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                   topic.toString());
           consumer.acknowledgeAsync(msgId).thenAccept(consumer -> 
numMsgsAcked.increment());
   
   with fix:
                  MessageId msgId = 
MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(message),
                           topic.toString());
   
                   if (!enableBatchIndexAcknowledgment && msgId instanceof 
BatchMessageIdImpl) {
                       BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) 
msgId;
                       BatchMessageAcker acker = batchMsgId.getAcker();
                       int size = acker.getBatchSize();
                       BitSet bitset = 
ackers.computeIfAbsent(Pair.of(batchMsgId.getLedgerId(), 
batchMsgId.getEntryId()), key -> {
                           BitSet bitSet = new BitSet(size);
                           bitSet.set(0, size);
                           return bitSet;
                       });
                       acker.setBitSet(bitset);
                   }
   
                   consumer.acknowledgeAsync(msgId).thenAccept(consumer -> {
                       if (log.isDebugEnabled()) {
                           log.debug("[{}] Ack'ed asynchronously", msgId);
                       }
   
                       if (!enableBatchIndexAcknowledgment && msgId instanceof 
BatchMessageIdImpl) {
                           BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) 
msgId;
                           
ackers.computeIfPresent(Pair.of(batchMsgId.getLedgerId(), 
batchMsgId.getEntryId()), (key, value) -> {
                               if (value.isEmpty()) {
                                  // infer ack was sent so drop bitset from map.
                                   return null;
                               }
                               return value;
                           });
                       }
                       numMsgsAcked.increment();
                   });
   
   where:
       private final ConcurrentHashMap<Pair<Long, Long>, BitSet> ackers = new 
ConcurrentHashMap<>(); // class member
       private final boolean enableBatchIndexAcknowledgment = false; // could 
be a new config for Web Socket proxy to set 
       consumer builder.enableBatchIndexAcknowledgment in Web Socket servlet.
   
   --------------------------------------------------------------------------
   Race condition:
   
   currently have 2 code segments that conditionally take lock in 
PersistentAcknowledgesGroupingTracker.java :
   
             if (isAckReceiptEnabled(consumer.getClientCnx())) {
                   ...
                   this.lock.readLock().lock();
             ... 
   
   suggest always taking the lock for doINdividualAck and flush (which is 
called on scheduled task).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to