andrekramer1 opened a new issue #14242:
URL: https://github.com/apache/pulsar/issues/14242
With acknowledgmentAtBatchIndexLevelEnabled = false (default) and the Web
Socket consumer builder set to builder.enableBatchIndexAcknowledgment(false)
(again the default) and all messages acknowledged by the web socket client,
we see some messages not properly acknowledged and deleted from the
subscription. If the subscribing web socket client disconnects and then
reconnects to Web Socket tier it will see those messages again and if it
reconnect again then again, etc.
The bug can be side stepped by setting both configs above to true so that
"batch index level acks" are fully enabled.
If not enabled in the Web Socket tier's consumer (regardless of broker
setting) then the following happens in Web Socket servlet:
A message acknowledgement is de-serialized and the message id object gets
it's own bitset. So even if an ack is for the same
batch as others it will have a separate bitmap and so batches with more than
1 message are never acknowledged to the broker
(as the bitsets will never become all unset - just one unset out of expected
size of batch).
We attempted a fix that makes the incoming (to Web Socket tier) message ids
share bitsets if in the same batch (after message ack
deserialization). This seems to work and is similar to what happens in the
batch index case, where the bitsets are duplicated and shared at the batch
level. The suggested fix is outlined below.
Also, for the true/true case (batch index acks enabled) we did spot a data
race in that the shared bitmap has a bit set which could be missed because the
flush has just read the bits from and then removed the same bitset from the
concurrent hash map. We would advice the readwritelock that is in the
PersistentAcknowledgmentsGroupingTracker already (but used only if
isAckReceiptEnabled(consumer.getClientCnx()). Occasional dropped acks could
otherwise result.
**To Reproduce**
We had 2 producers sending in messages (say 10) that ended up in batches of
usually 2 messages. And one Web Socket subscribing consumer connected to the
Web Proxy tier. It would see all messages but after disconnecting and
re-connecting it would see some messages again. These could not be consumed via
the web socket tier but can be consumed with a normal/binary Pulsar client (say
bin/pulsar-client) and we noted that the unconsumed messages were in batches of
more than one. Ledger(s) could also not be delete while the messages are
unconsumed so even if the application could handle the replays we quickly run
out of "bookie" storage space.
**Expected behavior**
All messages that are acked to be marked as consumed in subscription with
some occasional replays to be allowed (at least once semantics).
**Desktop (please complete the following information):**
- All deployed in Kubernetes. Private build of 2.8.0.
**Additional context**
We observed the problem in 2.8.0 but it should be similar for later versions
as the code has not changed in the Web Socket or parts of the Pulsar client
used inside the WebSocket servlet.
-------------------------------------------------------------------------
Suggested fix:
ConsumerHandler.java
current:
MessageId msgId =
MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
topic.toString());
consumer.acknowledgeAsync(msgId).thenAccept(consumer ->
numMsgsAcked.increment());
with fix:
MessageId msgId =
MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(message),
topic.toString());
if (!enableBatchIndexAcknowledgment && msgId instanceof
BatchMessageIdImpl) {
BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl)
msgId;
BatchMessageAcker acker = batchMsgId.getAcker();
int size = acker.getBatchSize();
BitSet bitset =
ackers.computeIfAbsent(Pair.of(batchMsgId.getLedgerId(),
batchMsgId.getEntryId()), key -> {
BitSet bitSet = new BitSet(size);
bitSet.set(0, size);
return bitSet;
});
acker.setBitSet(bitset);
}
consumer.acknowledgeAsync(msgId).thenAccept(consumer -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Ack'ed asynchronously", msgId);
}
if (!enableBatchIndexAcknowledgment && msgId instanceof
BatchMessageIdImpl) {
BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl)
msgId;
ackers.computeIfPresent(Pair.of(batchMsgId.getLedgerId(),
batchMsgId.getEntryId()), (key, value) -> {
if (value.isEmpty()) {
// infer ack was sent so drop bitset from map.
return null;
}
return value;
});
}
numMsgsAcked.increment();
});
where:
private final ConcurrentHashMap<Pair<Long, Long>, BitSet> ackers = new
ConcurrentHashMap<>(); // class member
private final boolean enableBatchIndexAcknowledgment = false; // could
be a new config for Web Socket proxy to set
consumer builder.enableBatchIndexAcknowledgment in Web Socket servlet.
--------------------------------------------------------------------------
Race condition:
currently have 2 code segments that conditionally take lock in
PersistentAcknowledgesGroupingTracker.java :
if (isAckReceiptEnabled(consumer.getClientCnx())) {
...
this.lock.readLock().lock();
...
suggest always taking the lock for doINdividualAck and flush (which is
called on scheduled task).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]