This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 15ae5a8  Avro ISpecificRecord schema support (#253)
15ae5a8 is described below

commit 15ae5a8216b0865094abfd8e5d73055acd4a639c
Author: Jon Klinaku <[email protected]>
AuthorDate: Tue Feb 25 09:37:34 2025 +0100

    Avro ISpecificRecord schema support (#253)
    
    * added Schema to CommandProducer
    
    * Added AvroISpecificRecord Schema Support when generated by AvroGen tool
    
    * Modified from library to reflection Avro schema and added schema tests
    
    * added some integration tests and modified consumer to accept schema in 
the Subscribe command
    
    * removed warnings in DotPulsar AvroISpecificRecordSchema.cs
    
    * added static constructor to AvroISpecificSchema and some minor changes!
    
    * removed unused "_AvroAssemblyTypeInfos" in AvroISpecificRecordSchema
---
 src/DotPulsar/Exceptions/SchemaException.cs        |  29 +++
 src/DotPulsar/Internal/Consumer.cs                 |   3 +-
 .../Internal/Extensions/TypeExtensions.cs          |  34 +++
 src/DotPulsar/Internal/ProducerChannelFactory.cs   |   6 +-
 src/DotPulsar/Schema.cs                            |   7 +-
 src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs | 257 +++++++++++++++++++++
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj       |   1 +
 tests/DotPulsar.Tests/IntegrationFixture.cs        |  19 ++
 tests/DotPulsar.Tests/Internal/ConsumerTests.cs    |  57 ++++-
 tests/DotPulsar.Tests/Internal/ProducerTests.cs    |  51 +++-
 .../Schemas/AvroISpecificRecordSchemaTests.cs      |  61 +++++
 .../TestSamples/AvroModels/AvroBlankSampleModel.cs |  20 ++
 .../TestSamples/AvroModels/AvroSampleModel.cs      |  71 ++++++
 .../AvroSampleModelWithWrongSCHEMAField.cs         |  34 +++
 14 files changed, 628 insertions(+), 22 deletions(-)

diff --git a/src/DotPulsar/Exceptions/SchemaException.cs 
b/src/DotPulsar/Exceptions/SchemaException.cs
new file mode 100644
index 0000000..7561c4d
--- /dev/null
+++ b/src/DotPulsar/Exceptions/SchemaException.cs
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions;
+
+/// <summary>
+/// Any error related to schemas
+/// </summary>
+public sealed class SchemaException : DotPulsarException
+{
+    public SchemaException(string message) : base(message)
+    {
+    }
+
+    public SchemaException(string message, Exception innerException) : 
base(message, innerException)
+    {
+    }
+}
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 142d63b..4a0eec4 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -434,7 +434,8 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
             ReplicateSubscriptionState = 
_consumerOptions.ReplicateSubscriptionState,
             Subscription = _consumerOptions.SubscriptionName,
             Topic = topic,
-            Type = (CommandSubscribe.SubType) _consumerOptions.SubscriptionType
+            Type = (CommandSubscribe.SubType) 
_consumerOptions.SubscriptionType,
+            Schema = _consumerOptions.Schema.SchemaInfo.PulsarSchema
         };
 
         foreach (var property in _consumerOptions.SubscriptionProperties)
diff --git a/src/DotPulsar/Internal/Extensions/TypeExtensions.cs 
b/src/DotPulsar/Internal/Extensions/TypeExtensions.cs
new file mode 100644
index 0000000..d9af663
--- /dev/null
+++ b/src/DotPulsar/Internal/Extensions/TypeExtensions.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Extensions;
+
+public static class TypeExtensions
+{
+    public static bool ImplementsBaseTypeFullName(this Type typeToCheck, 
string fullNameToCheckAgainst)
+    {
+        Type? tempType = typeToCheck;
+        while (true)
+        {
+            if (tempType is null)
+                break;
+            if (string.IsNullOrEmpty(tempType.FullName))
+                continue;
+            if (tempType.FullName.Equals(fullNameToCheckAgainst))
+                return true;
+            tempType = tempType.BaseType;
+        }
+        return false;
+    }
+}
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs 
b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 70bef2c..84b4e11 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -42,19 +42,19 @@ public sealed class ProducerChannelFactory : 
IProducerChannelFactory
         _correlationId = correlationId;
         _eventRegister = eventRegister;
         _connectionPool = connectionPool;
-
+        _schema = schemaInfo.PulsarSchema;
         _commandProducer = new CommandProducer
         {
             ProducerName = producerName,
             ProducerAccessMode = producerAccessMode,
-            Topic = topic
+            Topic = topic,
+            Schema = _schema
         };
 
         if (properties is not null)
             _commandProducer.Metadatas.AddRange(properties.Select(x => new 
KeyValue { Key = x.Key, Value = x.Value }));
 
         _compressorFactory = compressorFactory;
-        _schema = schemaInfo.PulsarSchema;
     }
 
     public async Task<IProducerChannel> Create(CancellationToken 
cancellationToken)
diff --git a/src/DotPulsar/Schema.cs b/src/DotPulsar/Schema.cs
index 8aa8375..0d5a678 100644
--- a/src/DotPulsar/Schema.cs
+++ b/src/DotPulsar/Schema.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -96,4 +96,9 @@ public static class Schema
     /// Time schema using TimeSpan.
     /// </summary>
     public static TimeSchema Time { get; }
+    /// <summary>
+    /// Avro schema for classes that use ISpecificRecord
+    /// </summary>
+    public static AvroISpecificRecordSchema<T> AvroISpecificRecord<T>() =>
+        new AvroISpecificRecordSchema<T>();
 }
diff --git a/src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs 
b/src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs
new file mode 100644
index 0000000..86d2d7a
--- /dev/null
+++ b/src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs
@@ -0,0 +1,257 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Extensions;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Text;
+
+public sealed class AvroISpecificRecordSchema<T> : ISchema<T>
+{
+    private static readonly Type _typeT;
+    private static readonly object _avroSchema;
+    private static readonly MethodInfo _avroWriterWriteMethod;
+    private static readonly MethodInfo _avroReaderReadMethod;
+    private static readonly TypeInfo _binaryEncoderTypeInfo;
+    private static readonly TypeInfo _binaryDecoderTypeInfo;
+    private static readonly Type _avroWriterTypeInfo;
+    private static readonly Type _avroReaderTypeInfo;
+    private static readonly SchemaInfo _schemaInfo;
+    private static readonly Exception _constructorException;
+
+    private readonly object _avroWriter;
+    private readonly object _avroReader;
+
+    public SchemaInfo SchemaInfo { get => _schemaInfo; }
+#pragma warning disable CS8618 // Supressed because if there is an init error 
the non-static constructor will throw it instead. This is done in case of if 
there is a wrong implementation of ISpecificRecord in T in order not to stop 
the whole runtime.
+    static AvroISpecificRecordSchema()
+#pragma warning restore CS8618 // Supressed because if there is an init error 
the non-static constructor will throw it instead. This is done in case of if 
there is a wrong implementation of ISpecificRecord in T in order not to stop 
the whole runtime.
+    {
+        const string schemaFullName = "Avro.Schema";
+        const string ISpecificRecordFullName = "Avro.Specific.ISpecificRecord";
+        _typeT = typeof(T);
+        string SchemaName;
+        string SchemaData;
+
+        try
+        {
+            var assembly = Assembly.Load("Avro");
+            if (!_typeT.GetInterfaces().Any(i => i.FullName == 
ISpecificRecordFullName))
+                throw new SchemaException(string.Format("The type {0} must 
implement {1}", _typeT, ISpecificRecordFullName));
+            _avroSchema = _typeT.GetField("_SCHEMA")?.GetValue(null) ??
+                throw new SchemaException(string.Format("The static field 
named '_SCHEMA' must not be null in type: {0}", _typeT));
+            Type avroSchemaType = _avroSchema.GetType();
+            if (!avroSchemaType.ImplementsBaseTypeFullName(schemaFullName))
+            {
+                throw new SchemaException(string.Format("field '_SCHEMA' must 
be of type {0}", schemaFullName));
+            }
+            SchemaName = (string) 
(avroSchemaType.GetProperty("Name")?.GetValue(_avroSchema) ?? string.Empty);
+            SchemaData = (string) (avroSchemaType
+                .GetMethod("ToString", Type.EmptyTypes)
+                ?.Invoke(_avroSchema, null) ??
+                throw new SchemaException(string.Format("Schema toString() 
must not return null for type {0}", _typeT)));
+            TryLoadStatic(out Type avroWriterType, out Type avroReaderType, 
out TypeInfo binaryEncoderType, out TypeInfo binaryDecoderType, out MethodInfo 
avroWriterMethod, out MethodInfo avroReaderMethod);
+            _avroWriterTypeInfo = avroWriterType;
+            _avroReaderTypeInfo = avroReaderType;
+            _binaryEncoderTypeInfo = binaryEncoderType;
+            _binaryDecoderTypeInfo = binaryDecoderType;
+            _avroWriterWriteMethod = avroWriterMethod;
+            _avroReaderReadMethod = avroReaderMethod;
+            _schemaInfo = new SchemaInfo(SchemaName,
+           Encoding.UTF8.GetBytes(SchemaData),
+           SchemaType.Avro,
+           new Dictionary<string, string>());
+        }
+        catch (Exception e)
+        {
+
+            _constructorException = e;
+        }
+
+    }
+    public AvroISpecificRecordSchema()
+    {
+        if (_constructorException != null)
+            throw _constructorException;
+        TryLoad(out object avroWriter, out object avroReader);
+        _avroWriter = avroWriter;
+        _avroReader = avroReader;
+    }
+    public T Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+    {
+        using var stream = new MemoryStream(bytes.ToArray());
+        T? def = default;
+        return (T) (_avroReaderReadMethod.Invoke(_avroReader, [def, 
GetBinaryDecoder(stream)]) ?? throw new 
SchemaSerializationException(string.Format("Could not Deserialize object of 
type {0}", _typeT)));
+    }
+
+    public ReadOnlySequence<byte> Encode(T message)
+    {
+        using var stream = new MemoryStream();
+        _avroWriterWriteMethod.Invoke(_avroWriter, [message, 
GetBinaryEncoder(stream)]);
+        return new ReadOnlySequence<byte>(stream.ToArray());
+    }
+    private static void TryLoadStatic(out Type avroWriter,
+        out Type avroReader,
+        out TypeInfo binaryEncoderType,
+        out TypeInfo binaryDecoderType,
+        out MethodInfo avroWriterMethod,
+        out MethodInfo avroReaderMethod)
+    {
+        var assembly = Assembly.Load("Avro");
+        var definedTypes = assembly.DefinedTypes.ToArray();
+        avroWriter = LoadSpecificDatumWriterType(definedTypes);
+        avroReader = LoadSpecificDatumReaderType(definedTypes);
+        binaryEncoderType = LoadBinaryEncoderType(definedTypes);
+        binaryDecoderType = LoadBinaryDecoderType(definedTypes);
+        avroWriterMethod = 
LoadSpecificDatumWriterMethod(avroWriter.GetMethods());
+        avroReaderMethod = 
LoadSpecificDatumReaderMethod(avroReader.GetMethods());
+    }
+    private void TryLoad(out object avroWriter, out object avroReader)
+    {
+        avroWriter = LoadSpecificDatumWriter();
+        avroReader = LoadSpecificDatumReader();
+    }
+
+    private static Type LoadSpecificDatumWriterType(IEnumerable<TypeInfo> 
types)
+    {
+        const string fullName = "Avro.Specific.SpecificDatumWriter`1";
+        foreach (var type in types)
+        {
+            if (type.FullName is null || !type.FullName.Equals(fullName))
+                continue;
+            if (type.IsPublic && type.IsClass)
+            {
+                return type.MakeGenericType(typeof(T));
+            }
+            break;
+        }
+        throw new SchemaException($"{fullName} as a public class was not 
found");
+    }
+    private static Type LoadSpecificDatumReaderType(IEnumerable<TypeInfo> 
types)
+    {
+        const string fullName = "Avro.Specific.SpecificDatumReader`1";
+        foreach (var type in types)
+        {
+            if (type.FullName is null || !type.FullName.Equals(fullName))
+                continue;
+            if (type.IsPublic && type.IsClass)
+            {
+                return type.MakeGenericType(typeof(T));
+            }
+            break;
+        }
+        throw new SchemaException($"{fullName} as a public class was not 
found");
+    }
+    private object LoadSpecificDatumWriter() =>
+        Activator.CreateInstance(_avroWriterTypeInfo, _avroSchema) ?? throw 
new SchemaException("Could not load SpecificDatumWriter");
+    private object LoadSpecificDatumReader() =>
+         Activator.CreateInstance(_avroReaderTypeInfo, _avroSchema, 
_avroSchema) ?? throw new SchemaException("Could not load SpecificDatumReader");
+
+    private static MethodInfo 
LoadSpecificDatumReaderMethod(IEnumerable<MethodInfo> methods)
+    {
+        const string name = "Read";
+        const string secondParamFullname = "Avro.IO.Decoder";
+        foreach (var method in methods)
+        {
+            if (method.Name != name || method.ReturnType != typeof(T))
+                continue;
+
+            var parameters = method.GetParameters();
+            if (parameters.Length != 2)
+                continue;
+
+            var param1Fullname = parameters[1].ParameterType.FullName;
+            if (param1Fullname == null)
+                continue;
+
+            if (parameters[0].ParameterType != typeof(T) ||
+                !param1Fullname.Equals(secondParamFullname))
+                continue;
+
+            return method;
+        }
+        throw new SchemaException($"A method with the name '{name}' matching 
the delegate was not found");
+    }
+    private static MethodInfo 
LoadSpecificDatumWriterMethod(IEnumerable<MethodInfo> methods)
+    {
+        const string name = "Write";
+        const string secondParamFullname = "Avro.IO.Encoder";
+        foreach (var method in methods)
+        {
+            if (method.Name != name || method.ReturnType != typeof(void))
+                continue;
+
+            var parameters = method.GetParameters();
+            if (parameters.Length != 2)
+                continue;
+
+            var param1Fullname = parameters[1].ParameterType.FullName;
+            if (param1Fullname == null)
+                continue;
+
+            if (parameters[0].ParameterType != typeof(T) ||
+                !param1Fullname.Equals(secondParamFullname))
+                continue;
+
+            return method;
+        }
+
+        throw new SchemaException($"A method with the name '{name}' matching 
the delegate was not found");
+    }
+    private static TypeInfo LoadBinaryEncoderType(IEnumerable<TypeInfo> types)
+    {
+        const string fullName = "Avro.IO.BinaryEncoder";
+
+        foreach (var type in types)
+        {
+            if (type.FullName is null || !type.FullName.Equals(fullName))
+                continue;
+
+            if (type.IsPublic && type.IsClass)
+                return type;
+
+            break;
+        }
+
+        throw new SchemaException($"{fullName} as a public class was not 
found");
+    }
+    private static TypeInfo LoadBinaryDecoderType(IEnumerable<TypeInfo> types)
+    {
+        const string fullName = "Avro.IO.BinaryDecoder";
+
+        foreach (var type in types)
+        {
+            if (type.FullName is null || !type.FullName.Equals(fullName))
+                continue;
+
+            if (type.IsPublic && type.IsClass)
+                return type;
+
+            break;
+        }
+
+        throw new SchemaException($"{fullName} as a public class was not 
found");
+    }
+    private object GetBinaryEncoder(MemoryStream stream) =>
+        Activator.CreateInstance(_binaryEncoderTypeInfo, stream) ?? throw new 
SchemaException("There was a problem while instanciating BinaryEncoder");
+
+    private object GetBinaryDecoder(MemoryStream stream) =>
+        Activator.CreateInstance(_binaryDecoderTypeInfo, stream) ?? throw new 
SchemaException("There was a problem while instanciating BinaryDecoder");
+
+}
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj 
b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index f244f99..20b8926 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -15,6 +15,7 @@
   </ItemGroup>
 
   <ItemGroup>
+    <PackageReference Include="Apache.Avro" Version="1.12.0" />
     <PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.18.1" />
     <PackageReference Include="AutoFixture.Xunit2" Version="4.18.1" />
     <PackageReference Include="coverlet.collector" Version="6.0.4">
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs 
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index 94d3886..e7cedc3 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -18,6 +18,8 @@ using DotNet.Testcontainers.Builders;
 using DotNet.Testcontainers.Containers;
 using DotNet.Testcontainers.Networks;
 using DotPulsar.Abstractions;
+using System.Text;
+using System.Text.Json;
 using Testcontainers.Pulsar;
 using Toxiproxy.Net;
 using Xunit.Abstractions;
@@ -170,6 +172,23 @@ public class IntegrationFixture : IAsyncLifetime
         if (result.ExitCode != 0)
             throw new Exception($"Could not create the partitioned topic: 
{result.Stderr}");
     }
+    public async Task AddSchemaToExistingTopic(string topic, SchemaInfo 
pulsarSchemaInfo, CancellationToken cancellationToken)
+    {
+        var schDef = new
+        {
+            type = pulsarSchemaInfo.Type.ToString().ToUpper(),
+            schema = Encoding.UTF8.GetString(pulsarSchemaInfo.Data),
+            properties = new Dictionary<string, string>()
+        };
+        string schemaFilename = $"{Guid.NewGuid().ToString()}.sch";
+        await 
_pulsarCluster.CopyAsync(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(schDef)),
 $"pulsar/{schemaFilename}", ct: cancellationToken);
+        string arguments = $"bin/pulsar-admin schemas upload --filename 
{schemaFilename} {topic}";
+        var result = await _pulsarCluster.ExecAsync(["/bin/bash", "-c", 
arguments], cancellationToken);
+
+        if (result.ExitCode != 0)
+            throw new Exception($"Could not upload a schema to topic: 
{result.Stderr}");
+
+    }
 
     public async Task<IAsyncDisposable> DisableThePulsarConnection()
     {
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs 
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index ea1f080..b379620 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -17,6 +17,7 @@ namespace DotPulsar.Tests.Internal;
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
 using DotPulsar.Extensions;
+using DotPulsar.Tests.Schemas.TestSamples.AvroModels;
 using System.Text.RegularExpressions;
 using Xunit.Abstractions;
 
@@ -275,6 +276,35 @@ public sealed class ConsumerTests : IDisposable
         //Assert
         exception.ShouldBeOfType<ConsumerFaultedException>();
     }
+    [Fact]
+    public async Task 
Receive_WhenReceivingToTopicWithSchemaAndReceiverHasWrongSchema_ShouldThrowException()
+    {
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+        await _fixture.AddSchemaToExistingTopic(topicName, 
pulsarSchema.SchemaInfo, _cts.Token);
+        var client = CreateClient();
+        await using var consumer = CreateConsumer(client, topicName, 
Schema.String);
+        await using var producer = CreateProducer(client, topicName, 
pulsarSchema);
+        await producer.Send(new AvroSampleModel(), _cts.Token);
+        var exception = await Record.ExceptionAsync(consumer.Receive().AsTask);
+        exception.ShouldBeOfType<IncompatibleSchemaException>();
+    }
+    [Fact]
+    public async Task 
Receive_WhenReceivingToTopicWithSchemaAndReceiverHasRightSchema_ShouldBeAbleToRecieve()
+    {
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+        await _fixture.AddSchemaToExistingTopic(topicName, 
pulsarSchema.SchemaInfo, _cts.Token);
+        var client = CreateClient();
+        await using var consumer = CreateConsumer(client, topicName, 
pulsarSchema);
+        await using var producer = CreateProducer(client, topicName, 
pulsarSchema);
+        var modelProduced = new AvroSampleModel();
+        await producer.Send(modelProduced, _cts.Token);
+        var consumed = await consumer.Receive(_cts.Token);
+        consumed.Value().Name.ShouldBe(modelProduced.Name);
+        consumed.Value().Surname.ShouldBe(modelProduced.Surname);
+        consumed.Value().Age.ShouldBe(modelProduced.Age);
+    }
 
     [Fact]
     public async Task 
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoesDown_ShouldBeAbleToReceiveWhenUpAgain()
@@ -408,19 +438,30 @@ public sealed class ConsumerTests : IDisposable
 
     private static string CreateSubscriptionName() => 
$"subscription-{Guid.NewGuid():N}";
 
-    private IProducer<string> CreateProducer(IPulsarClient pulsarClient, 
string topicName)
-        => pulsarClient.NewProducer(Schema.String)
+    private IProducer<T> CreateProducer<T>(
+        IPulsarClient pulsarClient,
+        string topicName,
+        ISchema<T> schema)
+        => pulsarClient.NewProducer(schema)
         .Topic(topicName)
         .StateChangedHandler(_testOutputHelper.Log)
         .Create();
+    private IProducer<string> CreateProducer(
+        IPulsarClient pulsarClient,
+        string topicName,
+        ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared)
+        => CreateProducer(pulsarClient, topicName, Schema.String);
+
 
     private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, 
string topicName)
-        => pulsarClient.NewConsumer(Schema.String)
-        .InitialPosition(SubscriptionInitialPosition.Earliest)
-        .SubscriptionName(CreateSubscriptionName())
-        .Topic(topicName)
-        .StateChangedHandler(_testOutputHelper.Log)
-        .Create();
+        => CreateConsumer(pulsarClient, topicName, Schema.String);
+    private IConsumer<T> CreateConsumer<T>(IPulsarClient pulsarClient, string 
topicName, ISchema<T> schema)
+       => pulsarClient.NewConsumer(schema)
+       .InitialPosition(SubscriptionInitialPosition.Earliest)
+       .SubscriptionName(CreateSubscriptionName())
+       .Topic(topicName)
+       .StateChangedHandler(_testOutputHelper.Log)
+       .Create();
 
     private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, 
IEnumerable<string> topics)
         => pulsarClient.NewConsumer(Schema.String)
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs 
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index bcb6e09..1554cb6 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -16,6 +16,8 @@ namespace DotPulsar.Tests.Internal;
 
 using DotPulsar.Abstractions;
 using DotPulsar.Extensions;
+using DotPulsar.Tests.Schemas.TestSamples.AvroModels;
+using System.Text;
 using Xunit.Abstractions;
 
 [Collection("Integration"), Trait("Category", "Integration")]
@@ -267,7 +269,28 @@ public sealed class ProducerTests : IDisposable
 
         foundNonNegativeOne.ShouldBeTrue();
     }
-
+    [Fact]
+    public async Task 
Send_WhenProducingToTopicWithSchemaAndProducerHasWrongSchema_ShouldThrowException()
+    {
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+        await _fixture.AddSchemaToExistingTopic(topicName, 
pulsarSchema.SchemaInfo, _cts.Token);
+        var client = CreateClient();
+        await using var producer = CreateProducer(client, topicName, 
Schema.ByteSequence);
+        var exception = await 
Record.ExceptionAsync(producer.Send(Encoding.UTF8.GetBytes("test"), 
_cts.Token).AsTask);
+        exception.ShouldBeAssignableTo<Exception>();
+    }
+    [Fact]
+    public async Task 
Send_WhenProducingToTopicWithSchemaAndProducerHasRightSchema_ShouldBeAbleToSend()
+    {
+        var topicName = await _fixture.CreateTopic(_cts.Token);
+        var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+        await _fixture.AddSchemaToExistingTopic(topicName, 
pulsarSchema.SchemaInfo, _cts.Token);
+        var client = CreateClient();
+        await using var producer = CreateProducer(client, topicName, 
pulsarSchema);
+        var exception = await Record.ExceptionAsync(producer.Send(new 
AvroSampleModel(), _cts.Token).AsTask);
+        exception.ShouldBeNull();
+    }
     [Fact]
     public async Task 
Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToSendWhileDown()
     {
@@ -394,23 +417,33 @@ public sealed class ProducerTests : IDisposable
 
     private static string CreateSubscriptionName() => 
$"subscription-{Guid.NewGuid():N}";
 
-    private IProducer<string> CreateProducer(
+
+    private IProducer<T> CreateProducer<T>(
         IPulsarClient pulsarClient,
         string topicName,
+        ISchema<T> schema,
         ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared)
-        => pulsarClient.NewProducer(Schema.String)
+        => pulsarClient.NewProducer(schema)
         .Topic(topicName)
         .ProducerAccessMode(producerAccessMode)
         .StateChangedHandler(_testOutputHelper.Log)
         .Create();
+    private IProducer<string> CreateProducer(
+        IPulsarClient pulsarClient,
+        string topicName,
+        ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared)
+        => CreateProducer(pulsarClient, topicName, Schema.String, 
producerAccessMode);
+
 
     private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, 
string topicName)
-        => pulsarClient.NewConsumer(Schema.String)
-        .InitialPosition(SubscriptionInitialPosition.Earliest)
-        .SubscriptionName(CreateSubscriptionName())
-        .Topic(topicName)
-        .StateChangedHandler(_testOutputHelper.Log)
-        .Create();
+        => CreateConsumer(pulsarClient, topicName, Schema.String);
+    private IConsumer<T> CreateConsumer<T>(IPulsarClient pulsarClient, string 
topicName, ISchema<T> schema)
+       => pulsarClient.NewConsumer(schema)
+       .InitialPosition(SubscriptionInitialPosition.Earliest)
+       .SubscriptionName(CreateSubscriptionName())
+       .Topic(topicName)
+       .StateChangedHandler(_testOutputHelper.Log)
+       .Create();
 
     private IPulsarClient CreateClient()
         => PulsarClient
diff --git a/tests/DotPulsar.Tests/Schemas/AvroISpecificRecordSchemaTests.cs 
b/tests/DotPulsar.Tests/Schemas/AvroISpecificRecordSchemaTests.cs
new file mode 100644
index 0000000..d702470
--- /dev/null
+++ b/tests/DotPulsar.Tests/Schemas/AvroISpecificRecordSchemaTests.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas;
+using DotPulsar.Exceptions;
+using DotPulsar.Schemas;
+using DotPulsar.Tests.Schemas.TestSamples.AvroModels;
+using System.Buffers;
+
+[Trait("Category", "Unit")]
+public sealed class AvroISpecificRecordSchemaTests
+{
+
+    [Fact]
+    public void Schema_ClassNotImplementingISpecific_ShouldThrowException()
+    {
+        var ex = 
Record.Exception(Schema.AvroISpecificRecord<AvroBlankSampleModel>);
+        ex.ShouldBeOfType<SchemaException>();
+    }
+    [Fact]
+    public void Schema_ClassStaticSchemaFIeldWrongType_ShouldThrowException()
+    {
+        var ex = 
Record.Exception(Schema.AvroISpecificRecord<AvroSampleModelWithWrongSCHEMAField>);
+        ex.ShouldBeOfType<SchemaException>();
+    }
+    [Fact]
+    public void 
Schema_ClassImplementsISpecificRecordCorrectly_ShouldReturnSchema()
+    {
+        var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+        
pulsarSchema.ShouldBeOfType<AvroISpecificRecordSchema<AvroSampleModel>>();
+    }
+    [Fact]
+    public void 
Schema_GivenDataAndImplementsEncodeProperly_ShouldReturnReadOnlySequenceOfBytes()
+    {
+        var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+        var res = pulsarSchema.Encode(new AvroSampleModel() { Name = "Jon", 
Surname = "Klinaku", Age = 27 });
+        res.ShouldBeOfType<ReadOnlySequence<byte>>();
+    }
+    [Fact]
+    public void 
Schema_GivenDataAndImplementsDecodeProperly_ShouldReturnCorrectObject()
+    {
+        var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+        ReadOnlySequence<byte> bytes = new ReadOnlySequence<byte>([16, 77, 97, 
114, 105, 103, 111, 110, 97, 8, 67, 101, 116, 97, 58]);
+        var res = pulsarSchema.Decode(bytes);
+        res.ShouldBeOfType<AvroSampleModel>();
+        res.Name.ShouldBe("Marigona");
+        res.Surname.ShouldBe("Ceta");
+        res.Age.ShouldBe(29);
+    }
+}
diff --git 
a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroBlankSampleModel.cs 
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroBlankSampleModel.cs
new file mode 100644
index 0000000..a16a4c4
--- /dev/null
+++ 
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroBlankSampleModel.cs
@@ -0,0 +1,20 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas.TestSamples.AvroModels;
+
+
+public class AvroBlankSampleModel
+{
+}
diff --git 
a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModel.cs 
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModel.cs
new file mode 100644
index 0000000..4ec16bb
--- /dev/null
+++ b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModel.cs
@@ -0,0 +1,71 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas.TestSamples.AvroModels;
+
+using Avro.Specific;
+using System;
+
+
+public class AvroSampleModel : ISpecificRecord
+{
+    public static readonly Avro.Schema _SCHEMA = Avro.Schema.Parse(@"
+        {
+            ""type"": ""record"",
+            ""name"": ""AvroSampleModel"",
+            ""fields"": [
+                { ""name"": ""Name"", ""type"": ""string"" },
+                { ""name"": ""Surname"", ""type"": ""string"" },
+                { ""name"": ""Age"", ""type"": ""int"" }
+            ]
+        }");
+    public AvroSampleModel()
+    {
+        Name = "Jon";
+        Surname = "Klinaku";
+        Age = 27;
+    }
+    public virtual Avro.Schema Schema => _SCHEMA;
+    public string Name { get; set; }
+    public string Surname { get; set; }
+    public int Age { get; set; }
+    public object Get(int fieldPos)
+    {
+        return fieldPos switch
+        {
+            0 => Name,
+            1 => Surname,
+            2 => Age,
+            _ => throw new ArgumentOutOfRangeException(nameof(fieldPos), 
"Invalid field position")
+        };
+    }
+
+    public void Put(int fieldPos, object value)
+    {
+        switch (fieldPos)
+        {
+            case 0:
+                Name = value as string ?? throw new ArgumentException("Name 
must be a string");
+                break;
+            case 1:
+                Surname = value as string ?? throw new ArgumentException("Name 
must be a string");
+                break;
+            case 2:
+                Age = value is int intValue ? intValue : throw new 
ArgumentException("Age must be an int");
+                break;
+            default:
+                throw new ArgumentOutOfRangeException(nameof(fieldPos), 
"Invalid field position");
+        }
+    }
+}
diff --git 
a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModelWithWrongSCHEMAField.cs
 
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModelWithWrongSCHEMAField.cs
new file mode 100644
index 0000000..24a3c68
--- /dev/null
+++ 
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModelWithWrongSCHEMAField.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas.TestSamples.AvroModels;
+
+using Avro.Specific;
+using System;
+
+public class AvroSampleModelWithWrongSCHEMAField : ISpecificRecord
+{
+    public static string _SCHEMA = "WRONG!";
+    public Avro.Schema Schema => throw new NotImplementedException();
+
+    public object Get(int fieldPos)
+    {
+        throw new NotImplementedException();
+    }
+
+    public void Put(int fieldPos, object fieldValue)
+    {
+        throw new NotImplementedException();
+    }
+}


Reply via email to