BewareMyPower commented on code in PR #17795:
URL: https://github.com/apache/pulsar/pull/17795#discussion_r991511684
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -519,35 +519,42 @@ public void sendAsync(Message<?> message, SendCallback
callback) {
// chunked message also sent individually so, try to acquire
send-permits
for (int i = 0; i < (totalChunks - 1); i++) {
- if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The
memory was already reserved */)) {
+ if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback,
message.getSequenceId(),
+ 0 /* The memory was already reserved */)) {
client.getMemoryLimitController().releaseMemory(uncompressedSize);
semaphoreRelease(i + 1);
return;
}
}
try {
- synchronized (this) {
- int readStartIndex = 0;
- String uuid = totalChunks > 1 ? String.format("%s-%d",
producerName, sequenceId) : null;
- ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ?
ChunkedMessageCtx.get(totalChunks) : null;
- byte[] schemaVersion = totalChunks > 1 &&
msg.getMessageBuilder().hasSchemaVersion()
- ? msg.getMessageBuilder().getSchemaVersion() : null;
- byte[] orderingKey = totalChunks > 1 &&
msg.getMessageBuilder().hasOrderingKey()
- ? msg.getMessageBuilder().getOrderingKey() : null;
- for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
- // Need to reset the schemaVersion, because the
schemaVersion is based on a ByteBuf object in
- // `MessageMetadata`, if we want to re-serialize the
`SEND` command using a same `MessageMetadata`,
- // we need to reset the ByteBuf of the schemaVersion in
`MessageMetadata`, I think we need to
- // reset `ByteBuf` objects in `MessageMetadata` after call
the method `MessageMetadata#writeTo()`.
- if (chunkId > 0) {
- if (schemaVersion != null) {
-
msg.getMessageBuilder().setSchemaVersion(schemaVersion);
- }
- if (orderingKey != null) {
-
msg.getMessageBuilder().setOrderingKey(orderingKey);
- }
+ int readStartIndex = 0;
+ String uuid = totalChunks > 1 ? String.format("%s-%d",
producerName, sequenceId) : null;
+ ChunkedMessageCtx chunkedMessageCtx = totalChunks > 1 ?
ChunkedMessageCtx.get(totalChunks) : null;
+ byte[] schemaVersion = totalChunks > 1 &&
msg.getMessageBuilder().hasSchemaVersion()
+ ? msg.getMessageBuilder().getSchemaVersion() : null;
+ byte[] orderingKey = totalChunks > 1 &&
msg.getMessageBuilder().hasOrderingKey()
+ ? msg.getMessageBuilder().getOrderingKey() : null;
+ for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+ // Need to reset the schemaVersion, because the schemaVersion
is based on a ByteBuf object in
+ // `MessageMetadata`, if we want to re-serialize the `SEND`
command using a same `MessageMetadata`,
+ // we need to reset the ByteBuf of the schemaVersion in
`MessageMetadata`, I think we need to
+ // reset `ByteBuf` objects in `MessageMetadata` after call the
method `MessageMetadata#writeTo()`.
+ if (chunkId > 0) {
+ if (schemaVersion != null) {
+
msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+ }
+ if (orderingKey != null) {
+ msg.getMessageBuilder().setOrderingKey(orderingKey);
}
+ }
+ if (chunkId > 0 && conf.isBlockIfQueueFull() &&
!canEnqueueRequest(callback,
+ message.getSequenceId(), 0 /* The memory was already
reserved */)) {
+
client.getMemoryLimitController().releaseMemory(uncompressedSize -
readStartIndex);
+ semaphoreRelease(totalChunks - chunkId);
+ return;
+ }
Review Comment:
`canEnqueueRequest` always return true when `conf.isBlockIfQueueFull()`
returns true. So it could never go into this if block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]