adamreeve commented on code in PR #46190:
URL: https://github.com/apache/arrow/pull/46190#discussion_r2103646566


##########
csharp/src/Apache.Arrow/Ipc/ITryCompressionCodec.cs:
##########
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+
+namespace Apache.Arrow.Ipc
+{
+    public interface ITryCompressionCodec : ICompressionCodec
+    {
+        /// <summary>
+        /// try to write compressed data to span
+        /// </summary>
+        /// <param name="source">The data to compress</param>
+        /// <param name="destination">Span to write compressed data to</param>

Review Comment:
   ```suggestion
           /// <param name="destination">Memory to write compressed data 
to</param>
   ```



##########
csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs:
##########
@@ -487,82 +493,114 @@ private Buffer CreateBitmapBuffer(ArrowBuffer buffer, 
int offset, int length)
                         BitUtility.SetBit(outputSpan, i, 
BitUtility.GetBit(inputSpan, offset + i));
                     }
 
-                    return CreateBuffer(memoryOwner.Memory);
+                    return CreateBuffer(memoryOwner);
                 }
             }
 
-            private Buffer CreateSlicedBuffer<T>(ArrowBuffer buffer, int 
offset, int length)
+            private Buffer CreateSlicedBuffer<T>(ArrowBuffer buffer, int 
offset, int length, bool owned)
                 where T : struct
             {
-                return CreateSlicedBuffer(buffer, Unsafe.SizeOf<T>(), offset, 
length);
+                return CreateSlicedBuffer(buffer, Unsafe.SizeOf<T>(), offset, 
length, owned);
             }
 
-            private Buffer CreateSlicedBuffer(ArrowBuffer buffer, int 
itemSize, int offset, int length)
+            private Buffer CreateSlicedBuffer(ArrowBuffer buffer, int 
itemSize, int offset, int length, bool owned)
             {
                 var byteLength = length * itemSize;
                 var paddedLength = CalculatePaddedBufferLength(byteLength);
                 if (offset != 0 || paddedLength < buffer.Length)
                 {
                     var byteOffset = offset * itemSize;
                     var sliceLength = Math.Min(paddedLength, buffer.Length - 
byteOffset);
-                    return CreateBuffer(buffer.Memory.Slice(byteOffset, 
sliceLength));
+                    return CreateBuffer(buffer.Memory.Slice(byteOffset, 
sliceLength),
+                        owned ? buffer : null);
                 }
 
-                return CreateBuffer(buffer.Memory);
+                return CreateBuffer(buffer, owned);
             }
 
-            private Buffer CreateBuffer(ArrowBuffer buffer)
+            private Buffer CreateBuffer(ArrowBuffer buffer, bool locallyOwned)
             {
-                return CreateBuffer(buffer.Memory);
+                if (locallyOwned)
+                {
+                    // this buffer is locally owned, we can dispose it
+                    return CreateBuffer(buffer.Memory, buffer);
+                }
+                return CreateBuffer(buffer.Memory, null);
             }
 
-            private Buffer CreateBuffer(ReadOnlyMemory<byte> buffer)
+            private Buffer CreateBuffer(IMemoryOwner<byte>  bufferOwner)
+            {
+                return CreateBuffer(bufferOwner.Memory, bufferOwner);
+            }
+
+            private Buffer CreateBuffer(ReadOnlyMemory<byte> buffer, 
IDisposable localBufferOwner)
             {
                 int offset = TotalLength;
                 const int UncompressedLengthSize = 8;
 
                 ReadOnlyMemory<byte> bufferToWrite;
+                IMemoryOwner<byte> bufferOwner=null;
                 if (_compressionCodec == null)
                 {
                     bufferToWrite = buffer;
                 }
                 else if (buffer.Length == 0)
                 {
                     // Write zero length and skip compression
-                    var uncompressedLengthBytes = 
_allocator.Allocate(UncompressedLengthSize);
-                    
BinaryPrimitives.WriteInt64LittleEndian(uncompressedLengthBytes.Memory.Span, 0);
-                    bufferToWrite = uncompressedLengthBytes.Memory;
+                    bufferOwner = _allocator.Allocate(UncompressedLengthSize);
+                    
BinaryPrimitives.WriteInt64LittleEndian(bufferOwner.Memory.Span, 0);
+                    bufferToWrite = bufferOwner.Memory.Slice(0, 
UncompressedLengthSize);
+                    // the local source buffer owner can be disposed, it's 
memory is no longer needed

Review Comment:
   ```suggestion
                       // the local source buffer owner can be disposed, its 
memory is no longer needed
   ```



##########
csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs:
##########
@@ -487,82 +493,114 @@ private Buffer CreateBitmapBuffer(ArrowBuffer buffer, 
int offset, int length)
                         BitUtility.SetBit(outputSpan, i, 
BitUtility.GetBit(inputSpan, offset + i));
                     }
 
-                    return CreateBuffer(memoryOwner.Memory);
+                    return CreateBuffer(memoryOwner);
                 }
             }
 
-            private Buffer CreateSlicedBuffer<T>(ArrowBuffer buffer, int 
offset, int length)
+            private Buffer CreateSlicedBuffer<T>(ArrowBuffer buffer, int 
offset, int length, bool owned)
                 where T : struct
             {
-                return CreateSlicedBuffer(buffer, Unsafe.SizeOf<T>(), offset, 
length);
+                return CreateSlicedBuffer(buffer, Unsafe.SizeOf<T>(), offset, 
length, owned);
             }
 
-            private Buffer CreateSlicedBuffer(ArrowBuffer buffer, int 
itemSize, int offset, int length)
+            private Buffer CreateSlicedBuffer(ArrowBuffer buffer, int 
itemSize, int offset, int length, bool owned)
             {
                 var byteLength = length * itemSize;
                 var paddedLength = CalculatePaddedBufferLength(byteLength);
                 if (offset != 0 || paddedLength < buffer.Length)
                 {
                     var byteOffset = offset * itemSize;
                     var sliceLength = Math.Min(paddedLength, buffer.Length - 
byteOffset);
-                    return CreateBuffer(buffer.Memory.Slice(byteOffset, 
sliceLength));
+                    return CreateBuffer(buffer.Memory.Slice(byteOffset, 
sliceLength),
+                        owned ? buffer : null);
                 }
 
-                return CreateBuffer(buffer.Memory);
+                return CreateBuffer(buffer, owned);
             }
 
-            private Buffer CreateBuffer(ArrowBuffer buffer)
+            private Buffer CreateBuffer(ArrowBuffer buffer, bool locallyOwned)
             {
-                return CreateBuffer(buffer.Memory);
+                if (locallyOwned)
+                {
+                    // this buffer is locally owned, we can dispose it
+                    return CreateBuffer(buffer.Memory, buffer);
+                }
+                return CreateBuffer(buffer.Memory, null);
             }
 
-            private Buffer CreateBuffer(ReadOnlyMemory<byte> buffer)
+            private Buffer CreateBuffer(IMemoryOwner<byte>  bufferOwner)
+            {
+                return CreateBuffer(bufferOwner.Memory, bufferOwner);
+            }
+
+            private Buffer CreateBuffer(ReadOnlyMemory<byte> buffer, 
IDisposable localBufferOwner)
             {
                 int offset = TotalLength;
                 const int UncompressedLengthSize = 8;
 
                 ReadOnlyMemory<byte> bufferToWrite;
+                IMemoryOwner<byte> bufferOwner=null;
                 if (_compressionCodec == null)
                 {
                     bufferToWrite = buffer;
                 }
                 else if (buffer.Length == 0)
                 {
                     // Write zero length and skip compression
-                    var uncompressedLengthBytes = 
_allocator.Allocate(UncompressedLengthSize);
-                    
BinaryPrimitives.WriteInt64LittleEndian(uncompressedLengthBytes.Memory.Span, 0);
-                    bufferToWrite = uncompressedLengthBytes.Memory;
+                    bufferOwner = _allocator.Allocate(UncompressedLengthSize);
+                    
BinaryPrimitives.WriteInt64LittleEndian(bufferOwner.Memory.Span, 0);
+                    bufferToWrite = bufferOwner.Memory.Slice(0, 
UncompressedLengthSize);
+                    // the local source buffer owner can be disposed, it's 
memory is no longer needed
+                    localBufferOwner?.Dispose();
                 }
                 else
                 {
                     // See format/Message.fbs, and the BUFFER 
BodyCompressionMethod for documentation on how
                     // compressed buffers are stored.
-                    _compressionStream.Seek(0, SeekOrigin.Begin);
-                    _compressionStream.SetLength(0);
-                    _compressionCodec.Compress(buffer, _compressionStream);
-                    if (_compressionStream.Length < buffer.Length)
+                    int newBufferLength = UncompressedLengthSize + 
buffer.Length;
+                    bufferOwner = _allocator.Allocate(newBufferLength);
+
+                    if(TryCompress(buffer, 
bufferOwner.Memory.Slice(UncompressedLengthSize, buffer.Length), out int 
bytesWritten))
                     {
-                        var newBuffer = _allocator.Allocate((int) 
_compressionStream.Length + UncompressedLengthSize);
-                        
BinaryPrimitives.WriteInt64LittleEndian(newBuffer.Memory.Span, buffer.Length);
-                        _compressionStream.Seek(0, SeekOrigin.Begin);
-                        
_compressionStream.ReadFullBuffer(newBuffer.Memory.Slice(UncompressedLengthSize));
-                        bufferToWrite = newBuffer.Memory;
+                        // Write the uncompressed length to the start of the 
buffer
+                        
BinaryPrimitives.WriteInt64LittleEndian(bufferOwner.Memory.Span, buffer.Length);
+                        bufferToWrite = bufferOwner.Memory.Slice(0, 
bytesWritten+UncompressedLengthSize);
                     }
                     else
                     {
+                        // TryCompress failed, because the buffer is too small.
                         // If the compressed buffer is larger than the 
uncompressed buffer, use the uncompressed
                         // buffer instead, and indicate this by setting the 
uncompressed length to -1
-                        var newBuffer = _allocator.Allocate(buffer.Length + 
UncompressedLengthSize);
-                        
BinaryPrimitives.WriteInt64LittleEndian(newBuffer.Memory.Span, -1);
-                        
buffer.CopyTo(newBuffer.Memory.Slice(UncompressedLengthSize));
-                        bufferToWrite = newBuffer.Memory;
+                        
BinaryPrimitives.WriteInt64LittleEndian(bufferOwner.Memory.Span, -1);
+                        
buffer.CopyTo(bufferOwner.Memory.Slice(UncompressedLengthSize));
+                        bufferToWrite = bufferOwner.Memory.Slice(0, 
newBufferLength);
                     }
+                    // the local source buffer owner can be disposed, it's 
memory was copied

Review Comment:
   ```suggestion
                       // the local source buffer owner can be disposed, its 
memory was copied
   ```



##########
csharp/src/Apache.Arrow/Ipc/ITryCompressionCodec.cs:
##########
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+
+namespace Apache.Arrow.Ipc
+{
+    public interface ITryCompressionCodec : ICompressionCodec
+    {
+        /// <summary>
+        /// try to write compressed data to span

Review Comment:
   ```suggestion
           /// Try to write compressed data to a fixed length memory span
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to