This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 15ae5a8 Avro ISpecificRecord schema support (#253)
15ae5a8 is described below
commit 15ae5a8216b0865094abfd8e5d73055acd4a639c
Author: Jon Klinaku <[email protected]>
AuthorDate: Tue Feb 25 09:37:34 2025 +0100
Avro ISpecificRecord schema support (#253)
* added Schema to CommandProducer
* Added AvroISpecificRecord Schema Support when generated by AvroGen tool
* Modified from library to reflection Avro schema and added schema tests
* added some integration tests and modified consumer to accept schema in
the Subscribe command
* removed warnings in DotPulsar AvroISpecificRecordSchema.cs
* added static constructor to AvroISpecificSchema and some minor changes!
* removed unused "_AvroAssemblyTypeInfos" in AvroISpecificRecordSchema
---
src/DotPulsar/Exceptions/SchemaException.cs | 29 +++
src/DotPulsar/Internal/Consumer.cs | 3 +-
.../Internal/Extensions/TypeExtensions.cs | 34 +++
src/DotPulsar/Internal/ProducerChannelFactory.cs | 6 +-
src/DotPulsar/Schema.cs | 7 +-
src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs | 257 +++++++++++++++++++++
tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 1 +
tests/DotPulsar.Tests/IntegrationFixture.cs | 19 ++
tests/DotPulsar.Tests/Internal/ConsumerTests.cs | 57 ++++-
tests/DotPulsar.Tests/Internal/ProducerTests.cs | 51 +++-
.../Schemas/AvroISpecificRecordSchemaTests.cs | 61 +++++
.../TestSamples/AvroModels/AvroBlankSampleModel.cs | 20 ++
.../TestSamples/AvroModels/AvroSampleModel.cs | 71 ++++++
.../AvroSampleModelWithWrongSCHEMAField.cs | 34 +++
14 files changed, 628 insertions(+), 22 deletions(-)
diff --git a/src/DotPulsar/Exceptions/SchemaException.cs
b/src/DotPulsar/Exceptions/SchemaException.cs
new file mode 100644
index 0000000..7561c4d
--- /dev/null
+++ b/src/DotPulsar/Exceptions/SchemaException.cs
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions;
+
+/// <summary>
+/// Any error related to schemas
+/// </summary>
+public sealed class SchemaException : DotPulsarException
+{
+ public SchemaException(string message) : base(message)
+ {
+ }
+
+ public SchemaException(string message, Exception innerException) :
base(message, innerException)
+ {
+ }
+}
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 142d63b..4a0eec4 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -434,7 +434,8 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
ReplicateSubscriptionState =
_consumerOptions.ReplicateSubscriptionState,
Subscription = _consumerOptions.SubscriptionName,
Topic = topic,
- Type = (CommandSubscribe.SubType) _consumerOptions.SubscriptionType
+ Type = (CommandSubscribe.SubType)
_consumerOptions.SubscriptionType,
+ Schema = _consumerOptions.Schema.SchemaInfo.PulsarSchema
};
foreach (var property in _consumerOptions.SubscriptionProperties)
diff --git a/src/DotPulsar/Internal/Extensions/TypeExtensions.cs
b/src/DotPulsar/Internal/Extensions/TypeExtensions.cs
new file mode 100644
index 0000000..d9af663
--- /dev/null
+++ b/src/DotPulsar/Internal/Extensions/TypeExtensions.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Extensions;
+
+public static class TypeExtensions
+{
+ public static bool ImplementsBaseTypeFullName(this Type typeToCheck,
string fullNameToCheckAgainst)
+ {
+ Type? tempType = typeToCheck;
+ while (true)
+ {
+ if (tempType is null)
+ break;
+ if (string.IsNullOrEmpty(tempType.FullName))
+ continue;
+ if (tempType.FullName.Equals(fullNameToCheckAgainst))
+ return true;
+ tempType = tempType.BaseType;
+ }
+ return false;
+ }
+}
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs
b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index 70bef2c..84b4e11 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -42,19 +42,19 @@ public sealed class ProducerChannelFactory :
IProducerChannelFactory
_correlationId = correlationId;
_eventRegister = eventRegister;
_connectionPool = connectionPool;
-
+ _schema = schemaInfo.PulsarSchema;
_commandProducer = new CommandProducer
{
ProducerName = producerName,
ProducerAccessMode = producerAccessMode,
- Topic = topic
+ Topic = topic,
+ Schema = _schema
};
if (properties is not null)
_commandProducer.Metadatas.AddRange(properties.Select(x => new
KeyValue { Key = x.Key, Value = x.Value }));
_compressorFactory = compressorFactory;
- _schema = schemaInfo.PulsarSchema;
}
public async Task<IProducerChannel> Create(CancellationToken
cancellationToken)
diff --git a/src/DotPulsar/Schema.cs b/src/DotPulsar/Schema.cs
index 8aa8375..0d5a678 100644
--- a/src/DotPulsar/Schema.cs
+++ b/src/DotPulsar/Schema.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -96,4 +96,9 @@ public static class Schema
/// Time schema using TimeSpan.
/// </summary>
public static TimeSchema Time { get; }
+ /// <summary>
+ /// Avro schema for classes that use ISpecificRecord
+ /// </summary>
+ public static AvroISpecificRecordSchema<T> AvroISpecificRecord<T>() =>
+ new AvroISpecificRecordSchema<T>();
}
diff --git a/src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs
b/src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs
new file mode 100644
index 0000000..86d2d7a
--- /dev/null
+++ b/src/DotPulsar/Schemas/AvroISpecificRecordSchema.cs
@@ -0,0 +1,257 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using DotPulsar.Internal.Extensions;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Text;
+
+public sealed class AvroISpecificRecordSchema<T> : ISchema<T>
+{
+ private static readonly Type _typeT;
+ private static readonly object _avroSchema;
+ private static readonly MethodInfo _avroWriterWriteMethod;
+ private static readonly MethodInfo _avroReaderReadMethod;
+ private static readonly TypeInfo _binaryEncoderTypeInfo;
+ private static readonly TypeInfo _binaryDecoderTypeInfo;
+ private static readonly Type _avroWriterTypeInfo;
+ private static readonly Type _avroReaderTypeInfo;
+ private static readonly SchemaInfo _schemaInfo;
+ private static readonly Exception _constructorException;
+
+ private readonly object _avroWriter;
+ private readonly object _avroReader;
+
+ public SchemaInfo SchemaInfo { get => _schemaInfo; }
+#pragma warning disable CS8618 // Supressed because if there is an init error
the non-static constructor will throw it instead. This is done in case of if
there is a wrong implementation of ISpecificRecord in T in order not to stop
the whole runtime.
+ static AvroISpecificRecordSchema()
+#pragma warning restore CS8618 // Supressed because if there is an init error
the non-static constructor will throw it instead. This is done in case of if
there is a wrong implementation of ISpecificRecord in T in order not to stop
the whole runtime.
+ {
+ const string schemaFullName = "Avro.Schema";
+ const string ISpecificRecordFullName = "Avro.Specific.ISpecificRecord";
+ _typeT = typeof(T);
+ string SchemaName;
+ string SchemaData;
+
+ try
+ {
+ var assembly = Assembly.Load("Avro");
+ if (!_typeT.GetInterfaces().Any(i => i.FullName ==
ISpecificRecordFullName))
+ throw new SchemaException(string.Format("The type {0} must
implement {1}", _typeT, ISpecificRecordFullName));
+ _avroSchema = _typeT.GetField("_SCHEMA")?.GetValue(null) ??
+ throw new SchemaException(string.Format("The static field
named '_SCHEMA' must not be null in type: {0}", _typeT));
+ Type avroSchemaType = _avroSchema.GetType();
+ if (!avroSchemaType.ImplementsBaseTypeFullName(schemaFullName))
+ {
+ throw new SchemaException(string.Format("field '_SCHEMA' must
be of type {0}", schemaFullName));
+ }
+ SchemaName = (string)
(avroSchemaType.GetProperty("Name")?.GetValue(_avroSchema) ?? string.Empty);
+ SchemaData = (string) (avroSchemaType
+ .GetMethod("ToString", Type.EmptyTypes)
+ ?.Invoke(_avroSchema, null) ??
+ throw new SchemaException(string.Format("Schema toString()
must not return null for type {0}", _typeT)));
+ TryLoadStatic(out Type avroWriterType, out Type avroReaderType,
out TypeInfo binaryEncoderType, out TypeInfo binaryDecoderType, out MethodInfo
avroWriterMethod, out MethodInfo avroReaderMethod);
+ _avroWriterTypeInfo = avroWriterType;
+ _avroReaderTypeInfo = avroReaderType;
+ _binaryEncoderTypeInfo = binaryEncoderType;
+ _binaryDecoderTypeInfo = binaryDecoderType;
+ _avroWriterWriteMethod = avroWriterMethod;
+ _avroReaderReadMethod = avroReaderMethod;
+ _schemaInfo = new SchemaInfo(SchemaName,
+ Encoding.UTF8.GetBytes(SchemaData),
+ SchemaType.Avro,
+ new Dictionary<string, string>());
+ }
+ catch (Exception e)
+ {
+
+ _constructorException = e;
+ }
+
+ }
+ public AvroISpecificRecordSchema()
+ {
+ if (_constructorException != null)
+ throw _constructorException;
+ TryLoad(out object avroWriter, out object avroReader);
+ _avroWriter = avroWriter;
+ _avroReader = avroReader;
+ }
+ public T Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ using var stream = new MemoryStream(bytes.ToArray());
+ T? def = default;
+ return (T) (_avroReaderReadMethod.Invoke(_avroReader, [def,
GetBinaryDecoder(stream)]) ?? throw new
SchemaSerializationException(string.Format("Could not Deserialize object of
type {0}", _typeT)));
+ }
+
+ public ReadOnlySequence<byte> Encode(T message)
+ {
+ using var stream = new MemoryStream();
+ _avroWriterWriteMethod.Invoke(_avroWriter, [message,
GetBinaryEncoder(stream)]);
+ return new ReadOnlySequence<byte>(stream.ToArray());
+ }
+ private static void TryLoadStatic(out Type avroWriter,
+ out Type avroReader,
+ out TypeInfo binaryEncoderType,
+ out TypeInfo binaryDecoderType,
+ out MethodInfo avroWriterMethod,
+ out MethodInfo avroReaderMethod)
+ {
+ var assembly = Assembly.Load("Avro");
+ var definedTypes = assembly.DefinedTypes.ToArray();
+ avroWriter = LoadSpecificDatumWriterType(definedTypes);
+ avroReader = LoadSpecificDatumReaderType(definedTypes);
+ binaryEncoderType = LoadBinaryEncoderType(definedTypes);
+ binaryDecoderType = LoadBinaryDecoderType(definedTypes);
+ avroWriterMethod =
LoadSpecificDatumWriterMethod(avroWriter.GetMethods());
+ avroReaderMethod =
LoadSpecificDatumReaderMethod(avroReader.GetMethods());
+ }
+ private void TryLoad(out object avroWriter, out object avroReader)
+ {
+ avroWriter = LoadSpecificDatumWriter();
+ avroReader = LoadSpecificDatumReader();
+ }
+
+ private static Type LoadSpecificDatumWriterType(IEnumerable<TypeInfo>
types)
+ {
+ const string fullName = "Avro.Specific.SpecificDatumWriter`1";
+ foreach (var type in types)
+ {
+ if (type.FullName is null || !type.FullName.Equals(fullName))
+ continue;
+ if (type.IsPublic && type.IsClass)
+ {
+ return type.MakeGenericType(typeof(T));
+ }
+ break;
+ }
+ throw new SchemaException($"{fullName} as a public class was not
found");
+ }
+ private static Type LoadSpecificDatumReaderType(IEnumerable<TypeInfo>
types)
+ {
+ const string fullName = "Avro.Specific.SpecificDatumReader`1";
+ foreach (var type in types)
+ {
+ if (type.FullName is null || !type.FullName.Equals(fullName))
+ continue;
+ if (type.IsPublic && type.IsClass)
+ {
+ return type.MakeGenericType(typeof(T));
+ }
+ break;
+ }
+ throw new SchemaException($"{fullName} as a public class was not
found");
+ }
+ private object LoadSpecificDatumWriter() =>
+ Activator.CreateInstance(_avroWriterTypeInfo, _avroSchema) ?? throw
new SchemaException("Could not load SpecificDatumWriter");
+ private object LoadSpecificDatumReader() =>
+ Activator.CreateInstance(_avroReaderTypeInfo, _avroSchema,
_avroSchema) ?? throw new SchemaException("Could not load SpecificDatumReader");
+
+ private static MethodInfo
LoadSpecificDatumReaderMethod(IEnumerable<MethodInfo> methods)
+ {
+ const string name = "Read";
+ const string secondParamFullname = "Avro.IO.Decoder";
+ foreach (var method in methods)
+ {
+ if (method.Name != name || method.ReturnType != typeof(T))
+ continue;
+
+ var parameters = method.GetParameters();
+ if (parameters.Length != 2)
+ continue;
+
+ var param1Fullname = parameters[1].ParameterType.FullName;
+ if (param1Fullname == null)
+ continue;
+
+ if (parameters[0].ParameterType != typeof(T) ||
+ !param1Fullname.Equals(secondParamFullname))
+ continue;
+
+ return method;
+ }
+ throw new SchemaException($"A method with the name '{name}' matching
the delegate was not found");
+ }
+ private static MethodInfo
LoadSpecificDatumWriterMethod(IEnumerable<MethodInfo> methods)
+ {
+ const string name = "Write";
+ const string secondParamFullname = "Avro.IO.Encoder";
+ foreach (var method in methods)
+ {
+ if (method.Name != name || method.ReturnType != typeof(void))
+ continue;
+
+ var parameters = method.GetParameters();
+ if (parameters.Length != 2)
+ continue;
+
+ var param1Fullname = parameters[1].ParameterType.FullName;
+ if (param1Fullname == null)
+ continue;
+
+ if (parameters[0].ParameterType != typeof(T) ||
+ !param1Fullname.Equals(secondParamFullname))
+ continue;
+
+ return method;
+ }
+
+ throw new SchemaException($"A method with the name '{name}' matching
the delegate was not found");
+ }
+ private static TypeInfo LoadBinaryEncoderType(IEnumerable<TypeInfo> types)
+ {
+ const string fullName = "Avro.IO.BinaryEncoder";
+
+ foreach (var type in types)
+ {
+ if (type.FullName is null || !type.FullName.Equals(fullName))
+ continue;
+
+ if (type.IsPublic && type.IsClass)
+ return type;
+
+ break;
+ }
+
+ throw new SchemaException($"{fullName} as a public class was not
found");
+ }
+ private static TypeInfo LoadBinaryDecoderType(IEnumerable<TypeInfo> types)
+ {
+ const string fullName = "Avro.IO.BinaryDecoder";
+
+ foreach (var type in types)
+ {
+ if (type.FullName is null || !type.FullName.Equals(fullName))
+ continue;
+
+ if (type.IsPublic && type.IsClass)
+ return type;
+
+ break;
+ }
+
+ throw new SchemaException($"{fullName} as a public class was not
found");
+ }
+ private object GetBinaryEncoder(MemoryStream stream) =>
+ Activator.CreateInstance(_binaryEncoderTypeInfo, stream) ?? throw new
SchemaException("There was a problem while instanciating BinaryEncoder");
+
+ private object GetBinaryDecoder(MemoryStream stream) =>
+ Activator.CreateInstance(_binaryDecoderTypeInfo, stream) ?? throw new
SchemaException("There was a problem while instanciating BinaryDecoder");
+
+}
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index f244f99..20b8926 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -15,6 +15,7 @@
</ItemGroup>
<ItemGroup>
+ <PackageReference Include="Apache.Avro" Version="1.12.0" />
<PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.18.1" />
<PackageReference Include="AutoFixture.Xunit2" Version="4.18.1" />
<PackageReference Include="coverlet.collector" Version="6.0.4">
diff --git a/tests/DotPulsar.Tests/IntegrationFixture.cs
b/tests/DotPulsar.Tests/IntegrationFixture.cs
index 94d3886..e7cedc3 100644
--- a/tests/DotPulsar.Tests/IntegrationFixture.cs
+++ b/tests/DotPulsar.Tests/IntegrationFixture.cs
@@ -18,6 +18,8 @@ using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;
using DotNet.Testcontainers.Networks;
using DotPulsar.Abstractions;
+using System.Text;
+using System.Text.Json;
using Testcontainers.Pulsar;
using Toxiproxy.Net;
using Xunit.Abstractions;
@@ -170,6 +172,23 @@ public class IntegrationFixture : IAsyncLifetime
if (result.ExitCode != 0)
throw new Exception($"Could not create the partitioned topic:
{result.Stderr}");
}
+ public async Task AddSchemaToExistingTopic(string topic, SchemaInfo
pulsarSchemaInfo, CancellationToken cancellationToken)
+ {
+ var schDef = new
+ {
+ type = pulsarSchemaInfo.Type.ToString().ToUpper(),
+ schema = Encoding.UTF8.GetString(pulsarSchemaInfo.Data),
+ properties = new Dictionary<string, string>()
+ };
+ string schemaFilename = $"{Guid.NewGuid().ToString()}.sch";
+ await
_pulsarCluster.CopyAsync(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(schDef)),
$"pulsar/{schemaFilename}", ct: cancellationToken);
+ string arguments = $"bin/pulsar-admin schemas upload --filename
{schemaFilename} {topic}";
+ var result = await _pulsarCluster.ExecAsync(["/bin/bash", "-c",
arguments], cancellationToken);
+
+ if (result.ExitCode != 0)
+ throw new Exception($"Could not upload a schema to topic:
{result.Stderr}");
+
+ }
public async Task<IAsyncDisposable> DisableThePulsarConnection()
{
diff --git a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
index ea1f080..b379620 100644
--- a/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ConsumerTests.cs
@@ -17,6 +17,7 @@ namespace DotPulsar.Tests.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
using DotPulsar.Extensions;
+using DotPulsar.Tests.Schemas.TestSamples.AvroModels;
using System.Text.RegularExpressions;
using Xunit.Abstractions;
@@ -275,6 +276,35 @@ public sealed class ConsumerTests : IDisposable
//Assert
exception.ShouldBeOfType<ConsumerFaultedException>();
}
+ [Fact]
+ public async Task
Receive_WhenReceivingToTopicWithSchemaAndReceiverHasWrongSchema_ShouldThrowException()
+ {
+ var topicName = await _fixture.CreateTopic(_cts.Token);
+ var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ await _fixture.AddSchemaToExistingTopic(topicName,
pulsarSchema.SchemaInfo, _cts.Token);
+ var client = CreateClient();
+ await using var consumer = CreateConsumer(client, topicName,
Schema.String);
+ await using var producer = CreateProducer(client, topicName,
pulsarSchema);
+ await producer.Send(new AvroSampleModel(), _cts.Token);
+ var exception = await Record.ExceptionAsync(consumer.Receive().AsTask);
+ exception.ShouldBeOfType<IncompatibleSchemaException>();
+ }
+ [Fact]
+ public async Task
Receive_WhenReceivingToTopicWithSchemaAndReceiverHasRightSchema_ShouldBeAbleToRecieve()
+ {
+ var topicName = await _fixture.CreateTopic(_cts.Token);
+ var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ await _fixture.AddSchemaToExistingTopic(topicName,
pulsarSchema.SchemaInfo, _cts.Token);
+ var client = CreateClient();
+ await using var consumer = CreateConsumer(client, topicName,
pulsarSchema);
+ await using var producer = CreateProducer(client, topicName,
pulsarSchema);
+ var modelProduced = new AvroSampleModel();
+ await producer.Send(modelProduced, _cts.Token);
+ var consumed = await consumer.Receive(_cts.Token);
+ consumed.Value().Name.ShouldBe(modelProduced.Name);
+ consumed.Value().Surname.ShouldBe(modelProduced.Surname);
+ consumed.Value().Age.ShouldBe(modelProduced.Age);
+ }
[Fact]
public async Task
Connectivity_WhenInitiallyConnectedWithNoMessagesThenGoesDown_ShouldBeAbleToReceiveWhenUpAgain()
@@ -408,19 +438,30 @@ public sealed class ConsumerTests : IDisposable
private static string CreateSubscriptionName() =>
$"subscription-{Guid.NewGuid():N}";
- private IProducer<string> CreateProducer(IPulsarClient pulsarClient,
string topicName)
- => pulsarClient.NewProducer(Schema.String)
+ private IProducer<T> CreateProducer<T>(
+ IPulsarClient pulsarClient,
+ string topicName,
+ ISchema<T> schema)
+ => pulsarClient.NewProducer(schema)
.Topic(topicName)
.StateChangedHandler(_testOutputHelper.Log)
.Create();
+ private IProducer<string> CreateProducer(
+ IPulsarClient pulsarClient,
+ string topicName,
+ ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared)
+ => CreateProducer(pulsarClient, topicName, Schema.String);
+
private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient,
string topicName)
- => pulsarClient.NewConsumer(Schema.String)
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .SubscriptionName(CreateSubscriptionName())
- .Topic(topicName)
- .StateChangedHandler(_testOutputHelper.Log)
- .Create();
+ => CreateConsumer(pulsarClient, topicName, Schema.String);
+ private IConsumer<T> CreateConsumer<T>(IPulsarClient pulsarClient, string
topicName, ISchema<T> schema)
+ => pulsarClient.NewConsumer(schema)
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName(CreateSubscriptionName())
+ .Topic(topicName)
+ .StateChangedHandler(_testOutputHelper.Log)
+ .Create();
private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient,
IEnumerable<string> topics)
=> pulsarClient.NewConsumer(Schema.String)
diff --git a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
index bcb6e09..1554cb6 100644
--- a/tests/DotPulsar.Tests/Internal/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ProducerTests.cs
@@ -16,6 +16,8 @@ namespace DotPulsar.Tests.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
+using DotPulsar.Tests.Schemas.TestSamples.AvroModels;
+using System.Text;
using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
@@ -267,7 +269,28 @@ public sealed class ProducerTests : IDisposable
foundNonNegativeOne.ShouldBeTrue();
}
-
+ [Fact]
+ public async Task
Send_WhenProducingToTopicWithSchemaAndProducerHasWrongSchema_ShouldThrowException()
+ {
+ var topicName = await _fixture.CreateTopic(_cts.Token);
+ var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ await _fixture.AddSchemaToExistingTopic(topicName,
pulsarSchema.SchemaInfo, _cts.Token);
+ var client = CreateClient();
+ await using var producer = CreateProducer(client, topicName,
Schema.ByteSequence);
+ var exception = await
Record.ExceptionAsync(producer.Send(Encoding.UTF8.GetBytes("test"),
_cts.Token).AsTask);
+ exception.ShouldBeAssignableTo<Exception>();
+ }
+ [Fact]
+ public async Task
Send_WhenProducingToTopicWithSchemaAndProducerHasRightSchema_ShouldBeAbleToSend()
+ {
+ var topicName = await _fixture.CreateTopic(_cts.Token);
+ var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ await _fixture.AddSchemaToExistingTopic(topicName,
pulsarSchema.SchemaInfo, _cts.Token);
+ var client = CreateClient();
+ await using var producer = CreateProducer(client, topicName,
pulsarSchema);
+ var exception = await Record.ExceptionAsync(producer.Send(new
AvroSampleModel(), _cts.Token).AsTask);
+ exception.ShouldBeNull();
+ }
[Fact]
public async Task
Connectivity_WhenConnectionIsInitiallyUpAndGoesDown_ShouldBeAbleToSendWhileDown()
{
@@ -394,23 +417,33 @@ public sealed class ProducerTests : IDisposable
private static string CreateSubscriptionName() =>
$"subscription-{Guid.NewGuid():N}";
- private IProducer<string> CreateProducer(
+
+ private IProducer<T> CreateProducer<T>(
IPulsarClient pulsarClient,
string topicName,
+ ISchema<T> schema,
ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared)
- => pulsarClient.NewProducer(Schema.String)
+ => pulsarClient.NewProducer(schema)
.Topic(topicName)
.ProducerAccessMode(producerAccessMode)
.StateChangedHandler(_testOutputHelper.Log)
.Create();
+ private IProducer<string> CreateProducer(
+ IPulsarClient pulsarClient,
+ string topicName,
+ ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared)
+ => CreateProducer(pulsarClient, topicName, Schema.String,
producerAccessMode);
+
private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient,
string topicName)
- => pulsarClient.NewConsumer(Schema.String)
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .SubscriptionName(CreateSubscriptionName())
- .Topic(topicName)
- .StateChangedHandler(_testOutputHelper.Log)
- .Create();
+ => CreateConsumer(pulsarClient, topicName, Schema.String);
+ private IConsumer<T> CreateConsumer<T>(IPulsarClient pulsarClient, string
topicName, ISchema<T> schema)
+ => pulsarClient.NewConsumer(schema)
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName(CreateSubscriptionName())
+ .Topic(topicName)
+ .StateChangedHandler(_testOutputHelper.Log)
+ .Create();
private IPulsarClient CreateClient()
=> PulsarClient
diff --git a/tests/DotPulsar.Tests/Schemas/AvroISpecificRecordSchemaTests.cs
b/tests/DotPulsar.Tests/Schemas/AvroISpecificRecordSchemaTests.cs
new file mode 100644
index 0000000..d702470
--- /dev/null
+++ b/tests/DotPulsar.Tests/Schemas/AvroISpecificRecordSchemaTests.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas;
+using DotPulsar.Exceptions;
+using DotPulsar.Schemas;
+using DotPulsar.Tests.Schemas.TestSamples.AvroModels;
+using System.Buffers;
+
+[Trait("Category", "Unit")]
+public sealed class AvroISpecificRecordSchemaTests
+{
+
+ [Fact]
+ public void Schema_ClassNotImplementingISpecific_ShouldThrowException()
+ {
+ var ex =
Record.Exception(Schema.AvroISpecificRecord<AvroBlankSampleModel>);
+ ex.ShouldBeOfType<SchemaException>();
+ }
+ [Fact]
+ public void Schema_ClassStaticSchemaFIeldWrongType_ShouldThrowException()
+ {
+ var ex =
Record.Exception(Schema.AvroISpecificRecord<AvroSampleModelWithWrongSCHEMAField>);
+ ex.ShouldBeOfType<SchemaException>();
+ }
+ [Fact]
+ public void
Schema_ClassImplementsISpecificRecordCorrectly_ShouldReturnSchema()
+ {
+ var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+
pulsarSchema.ShouldBeOfType<AvroISpecificRecordSchema<AvroSampleModel>>();
+ }
+ [Fact]
+ public void
Schema_GivenDataAndImplementsEncodeProperly_ShouldReturnReadOnlySequenceOfBytes()
+ {
+ var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ var res = pulsarSchema.Encode(new AvroSampleModel() { Name = "Jon",
Surname = "Klinaku", Age = 27 });
+ res.ShouldBeOfType<ReadOnlySequence<byte>>();
+ }
+ [Fact]
+ public void
Schema_GivenDataAndImplementsDecodeProperly_ShouldReturnCorrectObject()
+ {
+ var pulsarSchema = Schema.AvroISpecificRecord<AvroSampleModel>();
+ ReadOnlySequence<byte> bytes = new ReadOnlySequence<byte>([16, 77, 97,
114, 105, 103, 111, 110, 97, 8, 67, 101, 116, 97, 58]);
+ var res = pulsarSchema.Decode(bytes);
+ res.ShouldBeOfType<AvroSampleModel>();
+ res.Name.ShouldBe("Marigona");
+ res.Surname.ShouldBe("Ceta");
+ res.Age.ShouldBe(29);
+ }
+}
diff --git
a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroBlankSampleModel.cs
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroBlankSampleModel.cs
new file mode 100644
index 0000000..a16a4c4
--- /dev/null
+++
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroBlankSampleModel.cs
@@ -0,0 +1,20 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas.TestSamples.AvroModels;
+
+
+public class AvroBlankSampleModel
+{
+}
diff --git
a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModel.cs
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModel.cs
new file mode 100644
index 0000000..4ec16bb
--- /dev/null
+++ b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModel.cs
@@ -0,0 +1,71 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas.TestSamples.AvroModels;
+
+using Avro.Specific;
+using System;
+
+
+public class AvroSampleModel : ISpecificRecord
+{
+ public static readonly Avro.Schema _SCHEMA = Avro.Schema.Parse(@"
+ {
+ ""type"": ""record"",
+ ""name"": ""AvroSampleModel"",
+ ""fields"": [
+ { ""name"": ""Name"", ""type"": ""string"" },
+ { ""name"": ""Surname"", ""type"": ""string"" },
+ { ""name"": ""Age"", ""type"": ""int"" }
+ ]
+ }");
+ public AvroSampleModel()
+ {
+ Name = "Jon";
+ Surname = "Klinaku";
+ Age = 27;
+ }
+ public virtual Avro.Schema Schema => _SCHEMA;
+ public string Name { get; set; }
+ public string Surname { get; set; }
+ public int Age { get; set; }
+ public object Get(int fieldPos)
+ {
+ return fieldPos switch
+ {
+ 0 => Name,
+ 1 => Surname,
+ 2 => Age,
+ _ => throw new ArgumentOutOfRangeException(nameof(fieldPos),
"Invalid field position")
+ };
+ }
+
+ public void Put(int fieldPos, object value)
+ {
+ switch (fieldPos)
+ {
+ case 0:
+ Name = value as string ?? throw new ArgumentException("Name
must be a string");
+ break;
+ case 1:
+ Surname = value as string ?? throw new ArgumentException("Name
must be a string");
+ break;
+ case 2:
+ Age = value is int intValue ? intValue : throw new
ArgumentException("Age must be an int");
+ break;
+ default:
+ throw new ArgumentOutOfRangeException(nameof(fieldPos),
"Invalid field position");
+ }
+ }
+}
diff --git
a/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModelWithWrongSCHEMAField.cs
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModelWithWrongSCHEMAField.cs
new file mode 100644
index 0000000..24a3c68
--- /dev/null
+++
b/tests/DotPulsar.Tests/Schemas/TestSamples/AvroModels/AvroSampleModelWithWrongSCHEMAField.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas.TestSamples.AvroModels;
+
+using Avro.Specific;
+using System;
+
+public class AvroSampleModelWithWrongSCHEMAField : ISpecificRecord
+{
+ public static string _SCHEMA = "WRONG!";
+ public Avro.Schema Schema => throw new NotImplementedException();
+
+ public object Get(int fieldPos)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void Put(int fieldPos, object fieldValue)
+ {
+ throw new NotImplementedException();
+ }
+}