This is an automated email from the ASF dual-hosted git repository.
zoltan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new 8eab8f14b AVRO-3802: [Csharp] Fix memory leak on deflate codec
decompression (#2439)
8eab8f14b is described below
commit 8eab8f14b522d8a99df81217fc94a6d17f5c6595
Author: CamilAbraham <[email protected]>
AuthorDate: Sun Aug 27 21:06:13 2023 +0100
AVRO-3802: [Csharp] Fix memory leak on deflate codec decompression (#2439)
* AVRO-3802: [Csharp] Fix memory leak on avro deflate codec decompression
* AVRO-3802: [Csharp] Address PR comments
* AVRO-3802: [Csharp] Revert IDisposable change in Encoder and Decoder
* AVRO-3802: [Csharp] Remove implicit filtering of target sequence
---------
Co-authored-by: Camil Abraham <[email protected]>
---
lang/csharp/src/apache/main/File/DeflateCodec.cs | 32 ++++-------------
.../src/apache/main/Generic/GenericReader.cs | 29 ++++++++--------
.../apache/main/Generic/PreresolvingDatumReader.cs | 40 ++++++++++++----------
lang/csharp/src/apache/main/IO/BinaryEncoder.cs | 14 ++++----
.../src/apache/main/Specific/SpecificReader.cs | 24 +++++++------
lang/csharp/src/apache/test/File/FileTests.cs | 2 +-
6 files changed, 64 insertions(+), 77 deletions(-)
diff --git a/lang/csharp/src/apache/main/File/DeflateCodec.cs
b/lang/csharp/src/apache/main/File/DeflateCodec.cs
index 8ef9fce37..0ce37adb0 100644
--- a/lang/csharp/src/apache/main/File/DeflateCodec.cs
+++ b/lang/csharp/src/apache/main/File/DeflateCodec.cs
@@ -58,32 +58,14 @@ namespace Avro.File
/// <inheritdoc/>
public override byte[] Decompress(byte[] compressedData, int length)
{
-
- MemoryStream inStream = new MemoryStream(compressedData);
- MemoryStream outStream = new MemoryStream();
-
- using (DeflateStream Decompress =
- new DeflateStream(inStream,
- CompressionMode.Decompress))
- {
- CopyTo(Decompress, outStream);
- }
-
- return outStream.ToArray();
- }
-
- /// <summary>
- /// Copies to stream.
- /// </summary>
- /// <param name="from">stream you are copying from</param>
- /// <param name="to">stream you are copying to</param>
- private static void CopyTo(Stream from, Stream to)
- {
- byte[] buffer = new byte[4096];
- int read;
- while ((read = from.Read(buffer, 0, buffer.Length)) != 0)
+ using (MemoryStream inStream = new MemoryStream(compressedData, 0,
length))
+ using (MemoryStream outStream = new MemoryStream())
{
- to.Write(buffer, 0, read);
+ using (DeflateStream decompress = new DeflateStream(inStream,
CompressionMode.Decompress))
+ {
+ decompress.CopyTo(outStream);
+ }
+ return outStream.ToArray();
}
}
diff --git a/lang/csharp/src/apache/main/Generic/GenericReader.cs
b/lang/csharp/src/apache/main/Generic/GenericReader.cs
index 05139f0fc..0b945b9ff 100644
--- a/lang/csharp/src/apache/main/Generic/GenericReader.cs
+++ b/lang/csharp/src/apache/main/Generic/GenericReader.cs
@@ -19,6 +19,7 @@ using System;
using System.Collections.Generic;
using Avro.IO;
using System.IO;
+using System.Linq;
namespace Avro.Generic
{
@@ -290,21 +291,21 @@ namespace Avro.Generic
}
}
- var defaultStream = new MemoryStream();
- var defaultEncoder = new BinaryEncoder(defaultStream);
- var defaultDecoder = new BinaryDecoder(defaultStream);
- foreach (Field rf in rs)
+ using (var defaultStream = new MemoryStream())
{
- if (writerSchema.Contains(rf.Name)) continue;
-
- defaultStream.Position = 0; // reset for writing
- Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema,
rf.DefaultValue);
- defaultStream.Flush();
- defaultStream.Position = 0; // reset for reading
-
- object obj = null;
- TryGetField(rec, rf.Name, rf.Pos, out obj);
- AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema, rf.Schema,
defaultDecoder));
+ var defaultEncoder = new BinaryEncoder(defaultStream);
+ var defaultDecoder = new BinaryDecoder(defaultStream);
+ foreach (Field rf in rs.Fields.Where(rf =>
!writerSchema.Contains(rf.Name)))
+ {
+ defaultStream.Position = 0; // reset for writing
+ Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema,
rf.DefaultValue);
+ defaultStream.Flush();
+ defaultStream.Position = 0; // reset for reading
+
+ object obj = null;
+ TryGetField(rec, rf.Name, rf.Pos, out obj);
+ AddField(rec, rf.Name, rf.Pos, Read(obj, rf.Schema,
rf.Schema, defaultDecoder));
+ }
}
return rec;
diff --git a/lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
b/lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
index 22c80407d..53270faec 100644
--- a/lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
+++ b/lang/csharp/src/apache/main/Generic/PreresolvingDatumReader.cs
@@ -198,7 +198,7 @@ namespace Avro.Generic
var readerDefaultOrdinal = null != readerSchema.Default ?
readerSchema.Ordinal(readerSchema.Default) : -1;
foreach (var symbol in writerSchema.Symbols)
- {
+ {
var writerOrdinal = writerSchema.Ordinal(symbol);
if (readerSchema.Contains(symbol))
{
@@ -274,27 +274,29 @@ namespace Avro.Generic
{
if (writerSchema.Contains(rf.Name)) continue;
- var defaultStream = new MemoryStream();
- var defaultEncoder = new BinaryEncoder(defaultStream);
+ using (var defaultStream = new MemoryStream())
+ {
+ var defaultEncoder = new BinaryEncoder(defaultStream);
- defaultStream.Position = 0; // reset for writing
- Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema,
rf.DefaultValue);
- defaultStream.Flush();
- var defaultBytes = defaultStream.ToArray();
+ defaultStream.Position = 0; // reset for writing
+ Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema,
rf.DefaultValue);
+ defaultStream.Flush();
+ var defaultBytes = defaultStream.ToArray();
- var readItem = ResolveReader(rf.Schema, rf.Schema);
+ var readItem = ResolveReader(rf.Schema, rf.Schema);
- var rfInstance = rf;
- if(IsReusable(rf.Schema.Tag))
- {
- readSteps.Add((rec, d) => recordAccess.AddField(rec,
rfInstance.Name, rfInstance.Pos,
- readItem(recordAccess.GetField(rec, rfInstance.Name,
rfInstance.Pos),
- new BinaryDecoder(new MemoryStream(
defaultBytes)))));
- }
- else
- {
- readSteps.Add((rec, d) => recordAccess.AddField(rec,
rfInstance.Name, rfInstance.Pos,
- readItem(null, new BinaryDecoder(new
MemoryStream(defaultBytes)))));
+ var rfInstance = rf;
+ if (IsReusable(rf.Schema.Tag))
+ {
+ readSteps.Add((rec, d) => recordAccess.AddField(rec,
rfInstance.Name, rfInstance.Pos,
+ readItem(recordAccess.GetField(rec,
rfInstance.Name, rfInstance.Pos),
+ new BinaryDecoder(new
MemoryStream(defaultBytes)))));
+ }
+ else
+ {
+ readSteps.Add((rec, d) => recordAccess.AddField(rec,
rfInstance.Name, rfInstance.Pos,
+ readItem(null, new BinaryDecoder(new
MemoryStream(defaultBytes)))));
+ }
}
}
diff --git a/lang/csharp/src/apache/main/IO/BinaryEncoder.cs
b/lang/csharp/src/apache/main/IO/BinaryEncoder.cs
index 30100bf31..72af5f3a5 100644
--- a/lang/csharp/src/apache/main/IO/BinaryEncoder.cs
+++ b/lang/csharp/src/apache/main/IO/BinaryEncoder.cs
@@ -25,7 +25,7 @@ namespace Avro.IO
/// </summary>
public class BinaryEncoder : Encoder
{
- private readonly Stream Stream;
+ private readonly Stream stream;
/// <summary>
/// Initializes a new instance of the <see cref="BinaryEncoder"/>
class without a backing
@@ -42,7 +42,7 @@ namespace Avro.IO
/// <param name="stream">Stream to write to.</param>
public BinaryEncoder(Stream stream)
{
- this.Stream = stream;
+ this.stream = stream;
}
/// <summary>
@@ -203,22 +203,22 @@ namespace Avro.IO
/// <inheritdoc/>
public void WriteFixed(byte[] data, int start, int len)
{
- Stream.Write(data, start, len);
+ stream.Write(data, start, len);
}
private void writeBytes(byte[] bytes)
{
- Stream.Write(bytes, 0, bytes.Length);
+ stream.Write(bytes, 0, bytes.Length);
}
private void writeBytes(byte[] bytes, int offset, int length)
{
- Stream.Write(bytes, offset, length);
+ stream.Write(bytes, offset, length);
}
private void writeByte(byte b)
{
- Stream.WriteByte(b);
+ stream.WriteByte(b);
}
/// <summary>
@@ -226,7 +226,7 @@ namespace Avro.IO
/// </summary>
public void Flush()
{
- Stream.Flush();
+ stream.Flush();
}
}
}
diff --git a/lang/csharp/src/apache/main/Specific/SpecificReader.cs
b/lang/csharp/src/apache/main/Specific/SpecificReader.cs
index 2736cc189..1019fa36c 100644
--- a/lang/csharp/src/apache/main/Specific/SpecificReader.cs
+++ b/lang/csharp/src/apache/main/Specific/SpecificReader.cs
@@ -130,20 +130,22 @@ namespace Avro.Specific
}
}
- var defaultStream = new MemoryStream();
- var defaultEncoder = new BinaryEncoder(defaultStream);
- var defaultDecoder = new BinaryDecoder(defaultStream);
- foreach (Field rf in rs)
+ using (var defaultStream = new MemoryStream())
{
- if (writerSchema.Contains(rf.Name)) continue;
+ var defaultEncoder = new BinaryEncoder(defaultStream);
+ var defaultDecoder = new BinaryDecoder(defaultStream);
+ foreach (Field rf in rs)
+ {
+ if (writerSchema.Contains(rf.Name)) continue;
- defaultStream.Position = 0; // reset for writing
- Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema,
rf.DefaultValue);
- defaultStream.Flush();
- defaultStream.Position = 0; // reset for reading
+ defaultStream.Position = 0; // reset for writing
+ Resolver.EncodeDefaultValue(defaultEncoder, rf.Schema,
rf.DefaultValue);
+ defaultStream.Flush();
+ defaultStream.Position = 0; // reset for reading
- obj = rec.Get(rf.Pos);
- rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema,
defaultDecoder));
+ obj = rec.Get(rf.Pos);
+ rec.Put(rf.Pos, Read(obj, rf.Schema, rf.Schema,
defaultDecoder));
+ }
}
return rec;
diff --git a/lang/csharp/src/apache/test/File/FileTests.cs
b/lang/csharp/src/apache/test/File/FileTests.cs
index 666318b68..0ef81c976 100644
--- a/lang/csharp/src/apache/test/File/FileTests.cs
+++ b/lang/csharp/src/apache/test/File/FileTests.cs
@@ -18,6 +18,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
+using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
@@ -555,7 +556,6 @@ namespace Avro.Test.File
/// position in stream
/// </summary>
/// <param name="schemaStr"></param>
- /// <param name="value"></param>
/// <param name="codecType"></param>
[TestCaseSource(nameof(TestPartialReadSource))]
public void TestPartialRead(string schemaStr, Codec.Type codecType,
int position, int expectedRecords)