[
https://issues.apache.org/jira/browse/AVRO-3856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17769421#comment-17769421
]
Edmond Wong commented on AVRO-3856:
-----------------------------------
I do not think my pull request: https://github.com/apache/avro/pull/2519 to
Apache Avro project will be approved anytime soon. In the meanwhile, I found
out I don't the information in the deepest structure in the Avro schema I got.
For the workaround, I have removed the deepest structure from the schema ,
regenerate the SpecificRecord classes, and I have use a custom deserializer to
use the minimized schema. Since I no longer can use the original schema stored
in schema register without Apache Avro schema parse failing, I will have to
maintain the minimized schema and make sure it is compatible with the original
schema. Here is the custom deserializer if anyone interested:
using System.Diagnostics;
using System.Net;
using Avro.Generic;
using Avro.IO;
using Avro.Specific;
using Confluent.Kafka;
namespace EntitlementConsumer;
public class MinAvroDeserializer<T> : IAsyncDeserializer<T>
{
private readonly string _minSchema;
private readonly Dictionary<int, DatumReader<T>?> _datumReaderBySchemaId =
new();
private readonly SemaphoreSlim _deserializeMutex = new(1);
private Avro.Schema ReaderSchema \{ get; }
public MinAvroDeserializer(string minSchema)
{
_minSchema = minSchema;
if (typeof (ISpecificRecord).IsAssignableFrom(typeof (T)))
ReaderSchema = ((ISpecificRecord)
Activator.CreateInstance<T>()).Schema;
else if (typeof (T) == typeof (int))
ReaderSchema = Avro.Schema.Parse("int");
else if (typeof (T) == typeof (bool))
ReaderSchema = Avro.Schema.Parse("boolean");
else if (typeof (T) == typeof (double))
ReaderSchema = Avro.Schema.Parse("double");
else if (typeof (T) == typeof (string))
ReaderSchema = Avro.Schema.Parse("string");
else if (typeof (T) == typeof (float))
ReaderSchema = Avro.Schema.Parse("float");
else if (typeof (T) == typeof (long))
ReaderSchema = Avro.Schema.Parse("long");
else if (typeof (T) == typeof (byte[]))
{
ReaderSchema = Avro.Schema.Parse("bytes");
}
else
{
if (typeof (T) != typeof (Null))
throw new InvalidOperationException("AvroDeserializer only
accepts type parameters of int, bool, double, string, float, long, byte[],
instances of ISpecificRecord and subclasses of SpecificFixed.");
ReaderSchema = Avro.Schema.Parse("null");
}
}
public async Task<T> DeserializeAsync(
ReadOnlyMemory<byte> data,
bool isNull,
SerializationContext context)
{
try
{
if (isNull)
{
if (default (T) == null)
return default (T) ?? throw new
InvalidOperationException();;
throw new InvalidOperationException("Cannot deserialize null to
a Value Type");
}
T obj;
if (isNull)
obj = default (T) ?? throw new InvalidOperationException();
else
obj = await Deserialize(context.Topic,
data.ToArray()).ConfigureAwait(false);
return obj;
}
catch (AggregateException ex)
{
Debug.Assert(ex.InnerException != null, "ex.InnerException !=
null");
throw ex.InnerException;
}
}
private async Task<T> Deserialize(string topic, byte[] array)
{
GenericRecord genericRecord;
try
{
if (array.Length < 5)
throw new InvalidDataException(
$"Expecting data framing of length 5 bytes or more but
total data size is \{(object)array.Length} bytes");
using var stream = new MemoryStream(array);
using var reader = new BinaryReader(stream);
var writerId = reader.ReadByte() == 0 ?
IPAddress.NetworkToHostOrder(reader.ReadInt32()) : throw new
InvalidDataException(
$"Expecting data with Confluent Schema Registry framing. Magic
byte was \{(object)array[0]}, expecting \{(object)(byte)0}");
await _deserializeMutex.WaitAsync().ConfigureAwait(false);
DatumReader<T>? datumReader;
try
{
_datumReaderBySchemaId.TryGetValue(writerId, out datumReader);
if (datumReader == null)
{
datumReader = new
SpecificReader<T>(Avro.Schema.Parse(_minSchema), this.ReaderSchema);
_datumReaderBySchemaId[writerId] = datumReader;
}
}
finally
{
_deserializeMutex.Release();
}
if (!typeof (ISpecificRecord).IsAssignableFrom(typeof (T)))
return datumReader.Read(default, new BinaryDecoder(stream));
var instance = Activator.CreateInstance<T>();
return datumReader.Read(instance, new BinaryDecoder(stream));
}
catch (AggregateException ex)
{
Debug.Assert(ex.InnerException != null, "ex.InnerException !=
null");
throw ex.InnerException;
}
}
}
> Cannot customize MaxDepth when parsing Avro schema in C#
> --------------------------------------------------------
>
> Key: AVRO-3856
> URL: https://issues.apache.org/jira/browse/AVRO-3856
> Project: Apache Avro
> Issue Type: Bug
> Components: csharp
> Affects Versions: 1.11.2
> Reporter: Emanuele Sabellico
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> When parsing the Avro Schema with Newtonsoft.Json max depth is limited to 64,
> the default. It cannot be customized because it's using JObject.Parse that
> doesn't allow to customize it.
> Using JsonConvert can allow to change default MaxDepth
> {code:c#}
> JsonConvert.DefaultSettings = () =>
> {
> return new JsonSerializerSettings
> {
> MaxDepth = 100
> };
> };
> var schema = File.ReadAllText("schema.avsc");
> var json = JsonConvert.DeserializeObject<JObject>(schema);
> {code}
> [https://github.com/apache/avro/blob/41b3c08ca5da192786c2b08546e691b3126e1856/lang/csharp/src/apache/main/Schema/Schema.cs#L250]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)