eerhardt commented on a change in pull request #10527: URL: https://github.com/apache/arrow/pull/10527#discussion_r684688085
########## File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs ########## @@ -59,35 +59,41 @@ protected async ValueTask<RecordBatch> ReadRecordBatchAsync(CancellationToken ca { await ReadSchemaAsync().ConfigureAwait(false); - int messageLength = await ReadMessageLengthAsync(throwOnFullRead: false, cancellationToken) - .ConfigureAwait(false); + RecordBatch result = default; + Flatbuf.MessageHeader messageHeaderType = Flatbuf.MessageHeader.NONE; - if (messageLength == 0) + do { - // reached end - return null; - } - - RecordBatch result = null; - await ArrayPool<byte>.Shared.RentReturnAsync(messageLength, async (messageBuff) => - { - int bytesRead = await BaseStream.ReadFullBufferAsync(messageBuff, cancellationToken) + int messageLength = await ReadMessageLengthAsync(throwOnFullRead: false, cancellationToken) .ConfigureAwait(false); - EnsureFullRead(messageBuff, bytesRead); - Flatbuf.Message message = Flatbuf.Message.GetRootAsMessage(CreateByteBuffer(messageBuff)); + if (messageLength == 0) + { + // reached end + return null; + } - int bodyLength = checked((int)message.BodyLength); + await ArrayPool<byte>.Shared.RentReturnAsync(messageLength, async (messageBuff) => + { + int bytesRead = await BaseStream.ReadFullBufferAsync(messageBuff, cancellationToken) + .ConfigureAwait(false); + EnsureFullRead(messageBuff, bytesRead); - IMemoryOwner<byte> bodyBuffOwner = _allocator.Allocate(bodyLength); - Memory<byte> bodyBuff = bodyBuffOwner.Memory.Slice(0, bodyLength); - bytesRead = await BaseStream.ReadFullBufferAsync(bodyBuff, cancellationToken) - .ConfigureAwait(false); - EnsureFullRead(bodyBuff, bytesRead); + Flatbuf.Message message = Flatbuf.Message.GetRootAsMessage(CreateByteBuffer(messageBuff)); - FlatBuffers.ByteBuffer bodybb = CreateByteBuffer(bodyBuff); - result = CreateArrowObjectFromMessage(message, bodybb, bodyBuffOwner); - }).ConfigureAwait(false); + int bodyLength = checked((int)message.BodyLength); + messageHeaderType = message.HeaderType; + + IMemoryOwner<byte> bodyBuffOwner = _allocator.Allocate(bodyLength); + Memory<byte> bodyBuff = bodyBuffOwner.Memory.Slice(0, bodyLength); + bytesRead = await BaseStream.ReadFullBufferAsync(bodyBuff, cancellationToken) + .ConfigureAwait(false); + EnsureFullRead(bodyBuff, bytesRead); + + FlatBuffers.ByteBuffer bodybb = CreateByteBuffer(bodyBuff); + result = CreateArrowObjectFromMessage(message, bodybb, bodyBuffOwner); + }).ConfigureAwait(false); + } while (messageHeaderType == Flatbuf.MessageHeader.DictionaryBatch); Review comment: Would this be better as: ```suggestion } while (messageHeaderType != Flatbuf.MessageHeader.RecordBatch); ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org