This is an automated email from the ASF dual-hosted git repository.

zoltan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eab8f14b AVRO-3802: [Csharp] Fix memory leak on deflate codec 
decompression (#2439)
8eab8f14b is described below

commit 8eab8f14b522d8a99df81217fc94a6d17f5c6595
Author: CamilAbraham <[email protected]>
AuthorDate: Sun Aug 27 21:06:13 2023 +0100

    AVRO-3802: [Csharp] Fix memory leak on deflate codec decompression (#2439)
    
    * AVRO-3802: [Csharp] Fix memory leak on avro deflate codec decompression
    
    * AVRO-3802: [Csharp] Address PR comments
    
    * AVRO-3802: [Csharp] Revert IDisposable change in Encoder and Decoder
    
    * AVRO-3802: [Csharp] Remove implicit filtering of target sequence
    
    ---------
    
    Co-authored-by: Camil Abraham <[email protected]>
---
 lang/csharp/src/apache/main/File/DeflateCodec.cs   | 32 ++++-------------
 .../src/apache/main/Generic/GenericReader.cs       | 29 ++++++++--------
 .../apache/main/Generic/PreresolvingDatumReader.cs | 40 ++++++++++++----------
 lang/csharp/src/apache/main/IO/BinaryEncoder.cs    | 14 ++++----
 .../src/apache/main/Specific/SpecificReader.cs     | 24 +++++++------
 lang/csharp/src/apache/test/File/FileTests.cs      |  2 +-
 6 files changed, 64 insertions(+), 77 deletions(-)

diff --git a/lang/csharp/src/apache/main/File/DeflateCodec.cs 
b/lang/csharp/src/apache/main/File/DeflateCodec.cs
index 8ef9fce37..0ce37adb0 100644
--- a/lang/csharp/src/apache/main/File/DeflateCodec.cs
+++ b/lang/csharp/src/apache/main/File/DeflateCodec.cs
@@ -58,32 +58,14 @@ namespace Avro.File
         /// <inheritdoc/>
         public override byte[] Decompress(byte[] compressedData, int length)
         {
-
-            MemoryStream inStream = new MemoryStream(compressedData);
-            MemoryStream outStream = new MemoryStream();
-
-            using (DeflateStream Decompress =
-                        new DeflateStream(inStream,
-                        CompressionMode.Decompress))
-            {
-                CopyTo(Decompress, outStream);
-            }
-
-            return outStream.ToArray();
-        }
-
-        /// <summary>
-        /// Copies to stream.
-        /// </summary>
-        /// <param name="from">stream you are copying from</param>
-        /// <param name="to">stream you are copying to</param>
-        private static void CopyTo(Stream from, Stream to)
-        {
-            byte[] buffer = new byte[4096];
-            int read;
-            while ((read = from.Read(buffer, 0, buffer.Length)) != 0)
+            using (MemoryStream inStream = new MemoryStream(compressedData, 0, 
length))
+            using (MemoryStream outStream = new MemoryStream())
             {
-                to.Write(buffer, 0, read);
+                using (DeflateStream decompress = new DeflateStream(inStream, 
CompressionMode.Decompress))
+                {
+                    decompress.CopyTo(outStream);
+                }
+                return outStream.ToArray();
             }
         }
 
diff --git a/lang/csharp/src/apache/main/Generic/GenericReader.cs 
b/lang/csharp/src/apache/main/Generic/GenericReader.cs
index 05139f0fc..0b945b9ff 100644
--- a/lang/csharp/src/apache/main/Generic/GenericReader.cs
+++ b/lang/csharp/src/apache/main/Generic/GenericReader.cs
@@ -19,6 +19,7 @@ using System;
 using System.Collections.Generic;
 using Avro.IO;
 using System.IO;
+using System.Linq;
 
 namespace Avro.Generic
 {
@@ -290,21 +291,21 @@ namespace Avro.Generic
                 }
             }
 
-            var defaultStream = new MemoryStream();
-            var defaultEncoder = new BinaryEncoder(defaultStream);
-            var defaultDecoder = new BinaryDecoder(defaultStream);
-            foreach (Field rf in rs)
+            using (var defaultStream = new MemoryStream())
             {
-                if (writerSchema.Contains(rf.Name)) continue;
-
-                defaultStream.Position = 0; // reset for writing
-                Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, 
rf.DefaultValue);
-                defaultStream.Flush();
-                defaultStream.Position = 0; // reset for reading
-
-                object obj = null;
-                TryGetField(rec, rf.Name, rf.Pos, out obj);
-                AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema, 
defaultDecoder));
+                var defaultEncoder = new BinaryEncoder(defaultStream);
+                var defaultDecoder = new BinaryDecoder(defaultStream);
+                foreach (Field rf in rs.Fields.Where(rf => 
!writerSchema.Contains(rf.Name)))
+                {
+                    defaultStream.Position = 0; // reset for writing
+                    Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, 
rf.DefaultValue);
+                    defaultStream.Flush();
+                    defaultStream.Position = 0; // reset for reading
+
+                    object obj = null;
+                    TryGetField(rec, rf.Name, rf.Pos, out obj);
+                    AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, 
rf.Schema, defaultDecoder));
+                }
             }
 
             return rec;
diff --git a/lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs 
b/lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
index 22c80407d..53270faec 100644
--- a/lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
+++ b/lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
@@ -198,7 +198,7 @@ namespace Avro.Generic
             var readerDefaultOrdinal = null != readerSchema.Default ? 
readerSchema.Ordinal(readerSchema.Default) : -1;
 
             foreach (var symbol in writerSchema.Symbols)
-            { 
+            {
                 var writerOrdinal = writerSchema.Ordinal(symbol);
                 if (readerSchema.Contains(symbol))
                 {
@@ -274,27 +274,29 @@ namespace Avro.Generic
             {
                 if (writerSchema.Contains(rf.Name)) continue;
 
-                var defaultStream = new MemoryStream();
-                var defaultEncoder = new BinaryEncoder(defaultStream);
+                using (var defaultStream = new MemoryStream())
+                {
+                    var defaultEncoder = new BinaryEncoder(defaultStream);
 
-                defaultStream.Position = 0; // reset for writing
-                Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, 
rf.DefaultValue);
-                defaultStream.Flush();
-                   var defaultBytes = defaultStream.ToArray();
+                    defaultStream.Position = 0; // reset for writing
+                    Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, 
rf.DefaultValue);
+                    defaultStream.Flush();
+                    var defaultBytes = defaultStream.ToArray();
 
-                var readItem = ResolveReader(rf.Schema, rf.Schema);
+                    var readItem = ResolveReader(rf.Schema, rf.Schema);
 
-                var rfInstance = rf;
-                if(IsReusable(rf.Schema.Tag))
-                {
-                    readSteps.Add((rec, d) => recordAccess.AddField(rec, 
rfInstance.Name, rfInstance.Pos,
-                        readItem(recordAccess.GetField(rec, rfInstance.Name, 
rfInstance.Pos),
-                            new BinaryDecoder(new MemoryStream( 
defaultBytes)))));
-                }
-                else
-                {
-                    readSteps.Add((rec, d) => recordAccess.AddField(rec, 
rfInstance.Name, rfInstance.Pos,
-                        readItem(null, new BinaryDecoder(new 
MemoryStream(defaultBytes)))));
+                    var rfInstance = rf;
+                    if (IsReusable(rf.Schema.Tag))
+                    {
+                        readSteps.Add((rec, d) => recordAccess.AddField(rec, 
rfInstance.Name, rfInstance.Pos,
+                            readItem(recordAccess.GetField(rec, 
rfInstance.Name, rfInstance.Pos),
+                                new BinaryDecoder(new 
MemoryStream(defaultBytes)))));
+                    }
+                    else
+                    {
+                        readSteps.Add((rec, d) => recordAccess.AddField(rec, 
rfInstance.Name, rfInstance.Pos,
+                            readItem(null, new BinaryDecoder(new 
MemoryStream(defaultBytes)))));
+                    }
                 }
             }
 
diff --git a/lang/csharp/src/apache/main/IO/BinaryEncoder.cs 
b/lang/csharp/src/apache/main/IO/BinaryEncoder.cs
index 30100bf31..72af5f3a5 100644
--- a/lang/csharp/src/apache/main/IO/BinaryEncoder.cs
+++ b/lang/csharp/src/apache/main/IO/BinaryEncoder.cs
@@ -25,7 +25,7 @@ namespace Avro.IO
     /// </summary>
     public class BinaryEncoder : Encoder
     {
-        private readonly Stream Stream;
+        private readonly Stream stream;
 
         /// <summary>
         /// Initializes a new instance of the <see cref="BinaryEncoder"/> 
class without a backing
@@ -42,7 +42,7 @@ namespace Avro.IO
         /// <param name="stream">Stream to write to.</param>
         public BinaryEncoder(Stream stream)
         {
-            this.Stream = stream;
+            this.stream = stream;
         }
 
         /// <summary>
@@ -203,22 +203,22 @@ namespace Avro.IO
         /// <inheritdoc/>
         public void WriteFixed(byte[] data, int start, int len)
         {
-            Stream.Write(data, start, len);
+            stream.Write(data, start, len);
         }
 
         private void writeBytes(byte[] bytes)
         {
-            Stream.Write(bytes, 0, bytes.Length);
+            stream.Write(bytes, 0, bytes.Length);
         }
 
         private void writeBytes(byte[] bytes, int offset, int length)
         {
-            Stream.Write(bytes, offset, length);
+            stream.Write(bytes, offset, length);
         }
 
         private void writeByte(byte b)
         {
-            Stream.WriteByte(b);
+            stream.WriteByte(b);
         }
 
         /// <summary>
@@ -226,7 +226,7 @@ namespace Avro.IO
         /// </summary>
         public void Flush()
         {
-            Stream.Flush();
+            stream.Flush();
         }
     }
 }
diff --git a/lang/csharp/src/apache/main/Specific/SpecificReader.cs 
b/lang/csharp/src/apache/main/Specific/SpecificReader.cs
index 2736cc189..1019fa36c 100644
--- a/lang/csharp/src/apache/main/Specific/SpecificReader.cs
+++ b/lang/csharp/src/apache/main/Specific/SpecificReader.cs
@@ -130,20 +130,22 @@ namespace Avro.Specific
                 }
             }
 
-            var defaultStream = new MemoryStream();
-            var defaultEncoder = new BinaryEncoder(defaultStream);
-            var defaultDecoder = new BinaryDecoder(defaultStream);
-            foreach (Field rf in rs)
+            using (var defaultStream = new MemoryStream())
             {
-                if (writerSchema.Contains(rf.Name)) continue;
+                var defaultEncoder = new BinaryEncoder(defaultStream);
+                var defaultDecoder = new BinaryDecoder(defaultStream);
+                foreach (Field rf in rs)
+                {
+                    if (writerSchema.Contains(rf.Name)) continue;
 
-                defaultStream.Position = 0; // reset for writing
-                Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, 
rf.DefaultValue);
-                defaultStream.Flush();
-                defaultStream.Position = 0; // reset for reading
+                    defaultStream.Position = 0; // reset for writing
+                    Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema, 
rf.DefaultValue);
+                    defaultStream.Flush();
+                    defaultStream.Position = 0; // reset for reading
 
-                obj = rec.Get(rf.Pos);
-                rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, 
defaultDecoder));
+                    obj = rec.Get(rf.Pos);
+                    rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema, 
defaultDecoder));
+                }
             }
 
             return rec;
diff --git a/lang/csharp/src/apache/test/File/FileTests.cs 
b/lang/csharp/src/apache/test/File/FileTests.cs
index 666318b68..0ef81c976 100644
--- a/lang/csharp/src/apache/test/File/FileTests.cs
+++ b/lang/csharp/src/apache/test/File/FileTests.cs
@@ -18,6 +18,7 @@
 using System;
 using System.Collections;
 using System.Collections.Generic;
+using System.Diagnostics;
 using System.IO;
 using System.IO.Compression;
 using System.Linq;
@@ -555,7 +556,6 @@ namespace Avro.Test.File
         /// position in stream
         /// </summary>
         /// <param name="schemaStr"></param>
-        /// <param name="value"></param>
         /// <param name="codecType"></param>
         [TestCaseSource(nameof(TestPartialReadSource))]
         public void TestPartialRead(string schemaStr, Codec.Type codecType, 
int position, int expectedRecords)

Reply via email to