blankensteiner commented on code in PR #267: URL: https://github.com/apache/pulsar-dotpulsar/pull/267#discussion_r2115357445
########## src/DotPulsar/Internal/ConsumerChannel.cs: ########## @@ -80,58 +80,66 @@ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellatio await SendFlow(cancellationToken).ConfigureAwait(false); _sendWhenZero--; - - var message = _batchHandler.GetNext(); - - if (message is not null) - return message; - - var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false); - - if (!messagePackage.ValidateMagicNumberAndChecksum()) + try { - await RejectPackage(messagePackage, CommandAck.ValidationErrorType.ChecksumMismatch, cancellationToken).ConfigureAwait(false); - continue; - } + var message = _batchHandler.GetNext(); - var metadataSize = messagePackage.GetMetadataSize(); - var metadata = messagePackage.ExtractMetadata(metadataSize); - var data = messagePackage.ExtractData(metadataSize); + if (message is not null) + return message; - if (metadata.Compression != CompressionType.None) - { - var decompressor = _decompressors[(int) metadata.Compression]; - if (decompressor is null) - throw new CompressionException($"Support for {metadata.Compression} compression was not found"); + var messagePackage = await _queue.Dequeue(cancellationToken).ConfigureAwait(false); - try - { - data = decompressor.Decompress(data, (int) metadata.UncompressedSize); - } - catch + if (!messagePackage.ValidateMagicNumberAndChecksum()) { - await RejectPackage(messagePackage, CommandAck.ValidationErrorType.DecompressionError, cancellationToken).ConfigureAwait(false); + await RejectPackage(messagePackage, CommandAck.ValidationErrorType.ChecksumMismatch).ConfigureAwait(false); continue; } - } - var messageId = messagePackage.MessageId; - var redeliveryCount = messagePackage.RedeliveryCount; + var metadataSize = messagePackage.GetMetadataSize(); + var metadata = messagePackage.ExtractMetadata(metadataSize); + var data = messagePackage.ExtractData(metadataSize); - if (metadata.ShouldSerializeNumMessagesInBatch()) - { - try + if (metadata.Compression != CompressionType.None) { - return _batchHandler.Add(messageId, redeliveryCount, metadata, data); + var decompressor = _decompressors[(int) metadata.Compression]; + if (decompressor is null) + throw new CompressionException($"Support for {metadata.Compression} compression was not found"); + + try + { + data = decompressor.Decompress(data, (int) metadata.UncompressedSize); + } + catch + { + await RejectPackage(messagePackage, CommandAck.ValidationErrorType.DecompressionError).ConfigureAwait(false); + continue; + } } - catch + + var messageId = messagePackage.MessageId; + var redeliveryCount = messagePackage.RedeliveryCount; + + if (metadata.ShouldSerializeNumMessagesInBatch()) { - await RejectPackage(messagePackage, CommandAck.ValidationErrorType.BatchDeSerializeError, cancellationToken).ConfigureAwait(false); - continue; + try + { + return _batchHandler.Add(messageId, redeliveryCount, metadata, data); + } + catch + { + await RejectPackage(messagePackage, CommandAck.ValidationErrorType.BatchDeSerializeError).ConfigureAwait(false); + continue; + } } - } - return _messageFactory.Create(messageId.ToMessageId(_topic), redeliveryCount, data, metadata); + return _messageFactory.Create(messageId.ToMessageId(_topic), redeliveryCount, data, metadata); + } + catch (Exception e) when (e is not CompressionException) Review Comment: Would it make sense to only increment _sendWhenZero again if the exception is a OperationCanceledException AND the cancellation token has been cancelled? ########## src/DotPulsar/Internal/ConsumerChannel.cs: ########## @@ -224,7 +232,9 @@ private async Task RejectPackage(MessagePackage messagePackage, CommandAck.Valid ack.MessageIds.Add(messagePackage.MessageId); - await Send(ack, cancellationToken).ConfigureAwait(false); + // Don't allow RejectPackage to be cancelled since it only happens in the receive loop after messages + // have already been dequeued + await Send(ack, CancellationToken.None).ConfigureAwait(false); Review Comment: This would block the user task even if they cancelled their cancellation token. I get your point, but then we need to start a task (that we are not awaiting) to send the rejection. ########## tests/DotPulsar.Tests/Internal/FlowTests.cs: ########## @@ -0,0 +1,140 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Tests.Internal; + +using DotPulsar.Abstractions; +using DotPulsar.Extensions; +using System.Buffers; +using System.Text.Json; +using Xunit.Abstractions; + +[Collection("Integration"), Trait("Category", "Integration")] +public sealed class FlowTests Review Comment: Let's move and rename the tests in this class to follow our conventions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org