blankensteiner commented on code in PR #267:
URL: https://github.com/apache/pulsar-dotpulsar/pull/267#discussion_r2115357445


##########
src/DotPulsar/Internal/ConsumerChannel.cs:
##########
@@ -80,58 +80,66 @@ public async ValueTask<IMessage<TMessage>> 
Receive(CancellationToken cancellatio
                     await SendFlow(cancellationToken).ConfigureAwait(false);
 
                 _sendWhenZero--;
-
-                var message = _batchHandler.GetNext();
-
-                if (message is not null)
-                    return message;
-
-                var messagePackage = await 
_queue.Dequeue(cancellationToken).ConfigureAwait(false);
-
-                if (!messagePackage.ValidateMagicNumberAndChecksum())
+                try
                 {
-                    await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.ChecksumMismatch, 
cancellationToken).ConfigureAwait(false);
-                    continue;
-                }
+                    var message = _batchHandler.GetNext();
 
-                var metadataSize = messagePackage.GetMetadataSize();
-                var metadata = messagePackage.ExtractMetadata(metadataSize);
-                var data = messagePackage.ExtractData(metadataSize);
+                    if (message is not null)
+                        return message;
 
-                if (metadata.Compression != CompressionType.None)
-                {
-                    var decompressor = _decompressors[(int) 
metadata.Compression];
-                    if (decompressor is null)
-                        throw new CompressionException($"Support for 
{metadata.Compression} compression was not found");
+                    var messagePackage = await 
_queue.Dequeue(cancellationToken).ConfigureAwait(false);
 
-                    try
-                    {
-                        data = decompressor.Decompress(data, (int) 
metadata.UncompressedSize);
-                    }
-                    catch
+                    if (!messagePackage.ValidateMagicNumberAndChecksum())
                     {
-                        await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.DecompressionError, 
cancellationToken).ConfigureAwait(false);
+                        await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.ChecksumMismatch).ConfigureAwait(false);
                         continue;
                     }
-                }
 
-                var messageId = messagePackage.MessageId;
-                var redeliveryCount = messagePackage.RedeliveryCount;
+                    var metadataSize = messagePackage.GetMetadataSize();
+                    var metadata = 
messagePackage.ExtractMetadata(metadataSize);
+                    var data = messagePackage.ExtractData(metadataSize);
 
-                if (metadata.ShouldSerializeNumMessagesInBatch())
-                {
-                    try
+                    if (metadata.Compression != CompressionType.None)
                     {
-                        return _batchHandler.Add(messageId, redeliveryCount, 
metadata, data);
+                        var decompressor = _decompressors[(int) 
metadata.Compression];
+                        if (decompressor is null)
+                            throw new CompressionException($"Support for 
{metadata.Compression} compression was not found");
+
+                        try
+                        {
+                            data = decompressor.Decompress(data, (int) 
metadata.UncompressedSize);
+                        }
+                        catch
+                        {
+                            await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.DecompressionError).ConfigureAwait(false);
+                            continue;
+                        }
                     }
-                    catch
+
+                    var messageId = messagePackage.MessageId;
+                    var redeliveryCount = messagePackage.RedeliveryCount;
+
+                    if (metadata.ShouldSerializeNumMessagesInBatch())
                     {
-                        await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.BatchDeSerializeError, 
cancellationToken).ConfigureAwait(false);
-                        continue;
+                        try
+                        {
+                            return _batchHandler.Add(messageId, 
redeliveryCount, metadata, data);
+                        }
+                        catch
+                        {
+                            await RejectPackage(messagePackage, 
CommandAck.ValidationErrorType.BatchDeSerializeError).ConfigureAwait(false);
+                            continue;
+                        }
                     }
-                }
 
-                return _messageFactory.Create(messageId.ToMessageId(_topic), 
redeliveryCount, data, metadata);
+                    return 
_messageFactory.Create(messageId.ToMessageId(_topic), redeliveryCount, data, 
metadata);
+                }
+                catch (Exception e) when (e is not CompressionException)

Review Comment:
   Would it make sense to only increment _sendWhenZero again if the exception 
is a OperationCanceledException AND the cancellation token has been cancelled?



##########
src/DotPulsar/Internal/ConsumerChannel.cs:
##########
@@ -224,7 +232,9 @@ private async Task RejectPackage(MessagePackage 
messagePackage, CommandAck.Valid
 
         ack.MessageIds.Add(messagePackage.MessageId);
 
-        await Send(ack, cancellationToken).ConfigureAwait(false);
+        // Don't allow RejectPackage to be cancelled since it only happens in 
the receive loop after messages
+        // have already been dequeued
+        await Send(ack, CancellationToken.None).ConfigureAwait(false);

Review Comment:
   This would block the user task even if they cancelled their cancellation 
token. I get your point, but then we need to start a task (that we are not 
awaiting) to send the rejection. 



##########
tests/DotPulsar.Tests/Internal/FlowTests.cs:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
+using System.Buffers;
+using System.Text.Json;
+using Xunit.Abstractions;
+
+[Collection("Integration"), Trait("Category", "Integration")]
+public sealed class FlowTests

Review Comment:
   Let's move and rename the tests in this class to follow our conventions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to