eerhardt commented on a change in pull request #10527:
URL: https://github.com/apache/arrow/pull/10527#discussion_r684688085



##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs
##########
@@ -59,35 +59,41 @@ protected async ValueTask<RecordBatch> 
ReadRecordBatchAsync(CancellationToken ca
         {
             await ReadSchemaAsync().ConfigureAwait(false);
 
-            int messageLength = await ReadMessageLengthAsync(throwOnFullRead: 
false, cancellationToken)
-                .ConfigureAwait(false);
+            RecordBatch result = default;
+            Flatbuf.MessageHeader messageHeaderType = 
Flatbuf.MessageHeader.NONE;
 
-            if (messageLength == 0)
+            do
             {
-                // reached end
-                return null;
-            }
-
-            RecordBatch result = null;
-            await ArrayPool<byte>.Shared.RentReturnAsync(messageLength, async 
(messageBuff) =>
-            {
-                int bytesRead = await 
BaseStream.ReadFullBufferAsync(messageBuff, cancellationToken)
+                int messageLength = await 
ReadMessageLengthAsync(throwOnFullRead: false, cancellationToken)
                     .ConfigureAwait(false);
-                EnsureFullRead(messageBuff, bytesRead);
 
-                Flatbuf.Message message = 
Flatbuf.Message.GetRootAsMessage(CreateByteBuffer(messageBuff));
+                if (messageLength == 0)
+                {
+                    // reached end
+                    return null;
+                }
 
-                int bodyLength = checked((int)message.BodyLength);
+                await ArrayPool<byte>.Shared.RentReturnAsync(messageLength, 
async (messageBuff) =>
+                {
+                    int bytesRead = await 
BaseStream.ReadFullBufferAsync(messageBuff, cancellationToken)
+                        .ConfigureAwait(false);
+                    EnsureFullRead(messageBuff, bytesRead);
 
-                IMemoryOwner<byte> bodyBuffOwner = 
_allocator.Allocate(bodyLength);
-                Memory<byte> bodyBuff = bodyBuffOwner.Memory.Slice(0, 
bodyLength);
-                bytesRead = await BaseStream.ReadFullBufferAsync(bodyBuff, 
cancellationToken)
-                    .ConfigureAwait(false);
-                EnsureFullRead(bodyBuff, bytesRead);
+                    Flatbuf.Message message = 
Flatbuf.Message.GetRootAsMessage(CreateByteBuffer(messageBuff));
 
-                FlatBuffers.ByteBuffer bodybb = CreateByteBuffer(bodyBuff);
-                result = CreateArrowObjectFromMessage(message, bodybb, 
bodyBuffOwner);
-            }).ConfigureAwait(false);
+                    int bodyLength = checked((int)message.BodyLength);
+                    messageHeaderType = message.HeaderType;
+
+                    IMemoryOwner<byte> bodyBuffOwner = 
_allocator.Allocate(bodyLength);
+                    Memory<byte> bodyBuff = bodyBuffOwner.Memory.Slice(0, 
bodyLength);
+                    bytesRead = await BaseStream.ReadFullBufferAsync(bodyBuff, 
cancellationToken)
+                        .ConfigureAwait(false);
+                    EnsureFullRead(bodyBuff, bytesRead);
+
+                    FlatBuffers.ByteBuffer bodybb = CreateByteBuffer(bodyBuff);
+                    result = CreateArrowObjectFromMessage(message, bodybb, 
bodyBuffOwner);
+                }).ConfigureAwait(false);
+            } while (messageHeaderType == 
Flatbuf.MessageHeader.DictionaryBatch);

Review comment:
       Would this be better as:
   
   ```suggestion
               } while (messageHeaderType != Flatbuf.MessageHeader.RecordBatch);
   ```
   ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to