This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9e01e5c Add JSON and Protobuf native schema support (#279)
9e01e5c is described below
commit 9e01e5c155deb0ee49353ccc800385f9f5f34e82
Author: Thorben Luepkes <[email protected]>
AuthorDate: Wed Feb 11 11:01:34 2026 +0100
Add JSON and Protobuf native schema support (#279)
Implement ISchema<T> for JSON and Protobuf message types,
closing the gap with Java and Go Pulsar clients.
- JsonSchema<T>: uses System.Text.Json for serialization with
optional JsonSerializerOptions and schema definition string
- ProtobufSchema<T>: uses Google.Protobuf (already a dependency)
with native protobuf descriptor for schema data
- Add factory methods Schema.Json<T>() and Schema.Protobuf<T>()
- Add inner exception constructor to SchemaSerializationException
- Add System.Text.Json dependency for netstandard2.0/2.1 targets
- Include 21 unit tests covering round-trip, edge cases, and
error handling
Fix #254
---
src/DotPulsar/DotPulsar.csproj | 2 +
.../Exceptions/SchemaSerializationException.cs | 2 +
src/DotPulsar/Schema.cs | 20 +++
src/DotPulsar/Schemas/JsonSchema.cs | 71 ++++++++
src/DotPulsar/Schemas/ProtobufSchema.cs | 57 +++++++
tests/DotPulsar.Tests/Schemas/JsonSchemaTests.cs | 185 +++++++++++++++++++++
.../DotPulsar.Tests/Schemas/ProtobufSchemaTests.cs | 162 ++++++++++++++++++
.../Schemas/TestSamples/JsonModels/PersonModel.cs | 9 +-
8 files changed, 504 insertions(+), 4 deletions(-)
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 67e1b7a..80a2678 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -38,6 +38,7 @@
<PackageReference Include="System.Collections.Immutable" Version="10.0.2"
/>
<PackageReference Include="System.Diagnostics.DiagnosticSource"
Version="10.0.2" />
<PackageReference Include="System.IO.Pipelines" Version="10.0.2" />
+ <PackageReference Include="System.Text.Json" Version="10.0.2" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
@@ -45,6 +46,7 @@
<PackageReference Include="System.Collections.Immutable" Version="10.0.2"
/>
<PackageReference Include="System.Diagnostics.DiagnosticSource"
Version="10.0.2" />
<PackageReference Include="System.IO.Pipelines" Version="10.0.2" />
+ <PackageReference Include="System.Text.Json" Version="10.0.2" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
diff --git a/src/DotPulsar/Exceptions/SchemaSerializationException.cs
b/src/DotPulsar/Exceptions/SchemaSerializationException.cs
index fdb12a0..877151a 100644
--- a/src/DotPulsar/Exceptions/SchemaSerializationException.cs
+++ b/src/DotPulsar/Exceptions/SchemaSerializationException.cs
@@ -17,4 +17,6 @@ namespace DotPulsar.Exceptions;
public sealed class SchemaSerializationException : DotPulsarException
{
public SchemaSerializationException(string message) : base(message) { }
+
+ public SchemaSerializationException(string message, Exception
innerException) : base(message, innerException) { }
}
diff --git a/src/DotPulsar/Schema.cs b/src/DotPulsar/Schema.cs
index 6b6e750..6280dd8 100644
--- a/src/DotPulsar/Schema.cs
+++ b/src/DotPulsar/Schema.cs
@@ -97,6 +97,26 @@ public static class Schema
/// </summary>
public static TimeSchema Time { get; }
+ /// <summary>
+ /// JSON schema using System.Text.Json
+ /// </summary>
+ public static JsonSchema<T> Json<T>() => new();
+
+ /// <summary>
+ /// JSON schema using System.Text.Json with custom serializer options
+ /// </summary>
+ public static JsonSchema<T> Json<T>(System.Text.Json.JsonSerializerOptions
options) => new(options);
+
+ /// <summary>
+ /// JSON schema using System.Text.Json with custom serializer options and
schema definition
+ /// </summary>
+ public static JsonSchema<T> Json<T>(System.Text.Json.JsonSerializerOptions
options, string jsonSchemaDefinition) => new(options, jsonSchemaDefinition);
+
+ /// <summary>
+ /// Protobuf native schema for classes that implement
Google.Protobuf.IMessage
+ /// </summary>
+ public static ProtobufSchema<T> Protobuf<T>() where T :
Google.Protobuf.IMessage<T>, new() => new();
+
/// <summary>
/// Avro schema for classes that use ISpecificRecord
/// </summary>
diff --git a/src/DotPulsar/Schemas/JsonSchema.cs
b/src/DotPulsar/Schemas/JsonSchema.cs
new file mode 100644
index 0000000..45f1411
--- /dev/null
+++ b/src/DotPulsar/Schemas/JsonSchema.cs
@@ -0,0 +1,71 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using System.Buffers;
+using System.Collections.Immutable;
+using System.Text;
+using System.Text.Json;
+
+/// <summary>
+/// Schema definition for JSON encoded messages.
+/// </summary>
+public sealed class JsonSchema<T> : ISchema<T>
+{
+ private readonly JsonSerializerOptions _options;
+
+ public JsonSchema()
+ {
+ _options = new JsonSerializerOptions();
+ SchemaInfo = new SchemaInfo(typeof(T).Name, Array.Empty<byte>(),
SchemaType.Json, ImmutableDictionary<string, string>.Empty);
+ }
+
+ public JsonSchema(JsonSerializerOptions options)
+ {
+ _options = options;
+ SchemaInfo = new SchemaInfo(typeof(T).Name, Array.Empty<byte>(),
SchemaType.Json, ImmutableDictionary<string, string>.Empty);
+ }
+
+ public JsonSchema(JsonSerializerOptions options, string
jsonSchemaDefinition)
+ {
+ _options = options;
+ var schemaData = Encoding.UTF8.GetBytes(jsonSchemaDefinition);
+ SchemaInfo = new SchemaInfo(typeof(T).Name, schemaData,
SchemaType.Json, ImmutableDictionary<string, string>.Empty);
+ }
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public T Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ try
+ {
+ var array = bytes.ToArray();
+ return JsonSerializer.Deserialize<T>(array, _options)
+ ?? throw new SchemaSerializationException($"Failed to
deserialize JSON to type '{typeof(T).Name}'");
+ }
+ catch (JsonException exception)
+ {
+ throw new SchemaSerializationException($"Failed to deserialize
JSON to type '{typeof(T).Name}'", exception);
+ }
+ }
+
+ public ReadOnlySequence<byte> Encode(T message)
+ {
+ var bytes = JsonSerializer.SerializeToUtf8Bytes(message, _options);
+ return new ReadOnlySequence<byte>(bytes);
+ }
+}
diff --git a/src/DotPulsar/Schemas/ProtobufSchema.cs
b/src/DotPulsar/Schemas/ProtobufSchema.cs
new file mode 100644
index 0000000..e617aab
--- /dev/null
+++ b/src/DotPulsar/Schemas/ProtobufSchema.cs
@@ -0,0 +1,57 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas;
+
+using DotPulsar.Abstractions;
+using Google.Protobuf;
+using System.Buffers;
+using System.Collections.Immutable;
+
+/// <summary>
+/// Schema definition for Protobuf encoded messages.
+/// </summary>
+public sealed class ProtobufSchema<T> : ISchema<T> where T :
Google.Protobuf.IMessage<T>, new()
+{
+ public ProtobufSchema()
+ {
+ var instance = new T();
+ var name = instance.Descriptor.Name;
+ var data = instance.Descriptor.File.SerializedData.ToByteArray();
+
+ SchemaInfo = new SchemaInfo(name, data, SchemaType.ProtobufNative,
ImmutableDictionary<string, string>.Empty);
+ }
+
+ public SchemaInfo SchemaInfo { get; }
+
+ public T Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+ {
+ try
+ {
+ var message = new T();
+ message.MergeFrom(bytes.ToArray());
+ return message;
+ }
+ catch (InvalidProtocolBufferException exception)
+ {
+ throw new Exceptions.SchemaSerializationException($"Failed to
decode Protobuf message of type '{typeof(T).Name}'", exception);
+ }
+ }
+
+ public ReadOnlySequence<byte> Encode(T message)
+ {
+ var bytes = message.ToByteArray();
+ return new ReadOnlySequence<byte>(bytes);
+ }
+}
diff --git a/tests/DotPulsar.Tests/Schemas/JsonSchemaTests.cs
b/tests/DotPulsar.Tests/Schemas/JsonSchemaTests.cs
new file mode 100644
index 0000000..5e30ee7
--- /dev/null
+++ b/tests/DotPulsar.Tests/Schemas/JsonSchemaTests.cs
@@ -0,0 +1,185 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas;
+
+using DotPulsar.Schemas;
+using DotPulsar.Tests.Schemas.TestSamples.JsonModels;
+using System.Buffers;
+using System.Text.Json;
+
+[Trait("Category", "Unit")]
+public sealed class JsonSchemaTests
+{
+ [Fact]
+ public void
Constructor_GivenNoArguments_ShouldCreateSchemaWithDefaultOptions()
+ {
+ //Act
+ var schema = new JsonSchema<PersonModel>();
+
+ //Assert
+ schema.SchemaInfo.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void
Constructor_GivenCustomOptions_ShouldCreateSchemaWithCustomOptions()
+ {
+ //Arrange
+ var options = new JsonSerializerOptions { PropertyNamingPolicy =
JsonNamingPolicy.CamelCase };
+
+ //Act
+ var schema = new JsonSchema<PersonModel>(options);
+
+ //Assert
+ schema.SchemaInfo.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void SchemaInfo_ShouldHaveJsonSchemaType()
+ {
+ //Arrange
+ var schema = new JsonSchema<PersonModel>();
+
+ //Act
+ var schemaInfo = schema.SchemaInfo;
+
+ //Assert
+ schemaInfo.Type.ShouldBe(SchemaType.Json);
+ }
+
+ [Fact]
+ public void SchemaInfo_ShouldHaveCorrectName()
+ {
+ //Arrange
+ var schema = new JsonSchema<PersonModel>();
+
+ //Act
+ var schemaInfo = schema.SchemaInfo;
+
+ //Assert
+ schemaInfo.Name.ShouldBe("PersonModel");
+ }
+
+ [Fact]
+ public void Encode_GivenValidModel_ShouldReturnJsonBytes()
+ {
+ //Arrange
+ var schema = new JsonSchema<PersonModel>();
+ var model = new PersonModel { Name = "Alice", Age = 30 };
+
+ //Act
+ var bytes = schema.Encode(model);
+
+ //Assert
+ bytes.Length.ShouldBeGreaterThan(0);
+ }
+
+ [Fact]
+ public void Decode_GivenValidJsonBytes_ShouldReturnCorrectModel()
+ {
+ //Arrange
+ var schema = new JsonSchema<PersonModel>();
+ var expected = new PersonModel { Name = "Bob", Age = 25 };
+ var json = JsonSerializer.SerializeToUtf8Bytes(expected);
+ var bytes = new ReadOnlySequence<byte>(json);
+
+ //Act
+ var actual = schema.Decode(bytes);
+
+ //Assert
+ actual.Name.ShouldBe(expected.Name);
+ actual.Age.ShouldBe(expected.Age);
+ }
+
+ [Fact]
+ public void EncodeDecode_GivenValidModel_ShouldRoundTrip()
+ {
+ //Arrange
+ var schema = new JsonSchema<PersonModel>();
+ var expected = new PersonModel { Name = "Charlie", Age = 42 };
+
+ //Act
+ var bytes = schema.Encode(expected);
+ var actual = schema.Decode(bytes);
+
+ //Assert
+ actual.Name.ShouldBe(expected.Name);
+ actual.Age.ShouldBe(expected.Age);
+ }
+
+ [Fact]
+ public void EncodeDecode_GivenCamelCaseOptions_ShouldRoundTrip()
+ {
+ //Arrange
+ var options = new JsonSerializerOptions { PropertyNamingPolicy =
JsonNamingPolicy.CamelCase };
+ var schema = new JsonSchema<PersonModel>(options);
+ var expected = new PersonModel { Name = "Diana", Age = 35 };
+
+ //Act
+ var bytes = schema.Encode(expected);
+ var actual = schema.Decode(bytes);
+
+ //Assert
+ actual.Name.ShouldBe(expected.Name);
+ actual.Age.ShouldBe(expected.Age);
+ }
+
+ [Fact]
+ public void Encode_GivenCamelCaseOptions_ShouldProduceCamelCaseJson()
+ {
+ //Arrange
+ var options = new JsonSerializerOptions { PropertyNamingPolicy =
JsonNamingPolicy.CamelCase };
+ var schema = new JsonSchema<PersonModel>(options);
+ var model = new PersonModel { Name = "Eve", Age = 28 };
+
+ //Act
+ var bytes = schema.Encode(model);
+ var json = System.Text.Encoding.UTF8.GetString(bytes.ToArray());
+
+ //Assert
+ json.ShouldContain("\"name\"", Case.Sensitive);
+ json.ShouldContain("\"age\"", Case.Sensitive);
+ json.ShouldNotContain("\"Name\"", Case.Sensitive);
+ json.ShouldNotContain("\"Age\"", Case.Sensitive);
+ }
+
+ [Fact]
+ public void Constructor_GivenSchemaDefinition_ShouldUseItAsSchemaData()
+ {
+ //Arrange
+ var definition = "{\"type\":\"record\",\"name\":\"PersonModel\"}";
+ var options = new JsonSerializerOptions();
+
+ //Act
+ var schema = new JsonSchema<PersonModel>(options, definition);
+
+ //Assert
+ var schemaData =
System.Text.Encoding.UTF8.GetString(schema.SchemaInfo.Data);
+ schemaData.ShouldBe(definition);
+ }
+
+ [Fact]
+ public void
Decode_GivenEmptyBytes_ShouldThrowSchemaSerializationException()
+ {
+ //Arrange
+ var schema = new JsonSchema<PersonModel>();
+ var bytes = new ReadOnlySequence<byte>(Array.Empty<byte>());
+
+ //Act
+ var exception = Record.Exception(() => schema.Decode(bytes));
+
+ //Assert
+
exception.ShouldBeOfType<DotPulsar.Exceptions.SchemaSerializationException>();
+ }
+}
diff --git a/tests/DotPulsar.Tests/Schemas/ProtobufSchemaTests.cs
b/tests/DotPulsar.Tests/Schemas/ProtobufSchemaTests.cs
new file mode 100644
index 0000000..2490cd4
--- /dev/null
+++ b/tests/DotPulsar.Tests/Schemas/ProtobufSchemaTests.cs
@@ -0,0 +1,162 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas;
+
+using DotPulsar.Schemas;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using System.Buffers;
+
+[Trait("Category", "Unit")]
+public sealed class ProtobufSchemaTests
+{
+ [Fact]
+ public void Constructor_GivenValidProtobufMessage_ShouldCreateSchema()
+ {
+ //Act
+ var schema = new ProtobufSchema<StringValue>();
+
+ //Assert
+ schema.SchemaInfo.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public void SchemaInfo_ShouldHaveProtobufNativeSchemaType()
+ {
+ //Arrange
+ var schema = new ProtobufSchema<StringValue>();
+
+ //Act
+ var schemaInfo = schema.SchemaInfo;
+
+ //Assert
+ schemaInfo.Type.ShouldBe(SchemaType.ProtobufNative);
+ }
+
+ [Fact]
+ public void SchemaInfo_ShouldHaveCorrectName()
+ {
+ //Arrange
+ var schema = new ProtobufSchema<StringValue>();
+
+ //Act
+ var schemaInfo = schema.SchemaInfo;
+
+ //Assert
+ schemaInfo.Name.ShouldBe("StringValue");
+ }
+
+ [Fact]
+ public void SchemaInfo_ShouldHaveNonEmptySchemaData()
+ {
+ //Arrange
+ var schema = new ProtobufSchema<StringValue>();
+
+ //Act
+ var schemaInfo = schema.SchemaInfo;
+
+ //Assert
+ schemaInfo.Data.Length.ShouldBeGreaterThan(0);
+ }
+
+ [Fact]
+ public void Encode_GivenValidMessage_ShouldReturnBytes()
+ {
+ //Arrange
+ var schema = new ProtobufSchema<StringValue>();
+ var message = new StringValue { Value = "Hello Pulsar" };
+
+ //Act
+ var bytes = schema.Encode(message);
+
+ //Assert
+ bytes.Length.ShouldBeGreaterThan(0);
+ }
+
+ [Fact]
+ public void Decode_GivenValidBytes_ShouldReturnCorrectMessage()
+ {
+ //Arrange
+ var schema = new ProtobufSchema<StringValue>();
+ var expected = new StringValue { Value = "Hello Pulsar" };
+ var protobufBytes = expected.ToByteArray();
+ var bytes = new ReadOnlySequence<byte>(protobufBytes);
+
+ //Act
+ var actual = schema.Decode(bytes);
+
+ //Assert
+ actual.Value.ShouldBe(expected.Value);
+ }
+
+ [Fact]
+ public void EncodeDecode_GivenValidMessage_ShouldRoundTrip()
+ {
+ //Arrange
+ var schema = new ProtobufSchema<StringValue>();
+ var expected = new StringValue { Value = "Round trip test" };
+
+ //Act
+ var bytes = schema.Encode(expected);
+ var actual = schema.Decode(bytes);
+
+ //Assert
+ actual.Value.ShouldBe(expected.Value);
+ }
+
+ [Fact]
+ public void EncodeDecode_GivenInt32Value_ShouldRoundTrip()
+ {
+ //Arrange
+ var schema = new ProtobufSchema<Int32Value>();
+ var expected = new Int32Value { Value = 42 };
+
+ //Act
+ var bytes = schema.Encode(expected);
+ var actual = schema.Decode(bytes);
+
+ //Assert
+ actual.Value.ShouldBe(expected.Value);
+ }
+
+ [Fact]
+ public void EncodeDecode_GivenEmptyMessage_ShouldRoundTrip()
+ {
+ //Arrange
+ var schema = new ProtobufSchema<StringValue>();
+ var expected = new StringValue { Value = "" };
+
+ //Act
+ var bytes = schema.Encode(expected);
+ var actual = schema.Decode(bytes);
+
+ //Assert
+ actual.Value.ShouldBe(expected.Value);
+ }
+
+ [Fact]
+ public void Decode_GivenEmptyBytes_ShouldReturnDefaultMessage()
+ {
+ //Arrange
+ var schema = new ProtobufSchema<StringValue>();
+ var bytes = new ReadOnlySequence<byte>(Array.Empty<byte>());
+
+ //Act
+ var actual = schema.Decode(bytes);
+
+ //Assert
+ actual.Value.ShouldBe("");
+ }
+}
diff --git a/src/DotPulsar/Exceptions/SchemaSerializationException.cs
b/tests/DotPulsar.Tests/Schemas/TestSamples/JsonModels/PersonModel.cs
similarity index 75%
copy from src/DotPulsar/Exceptions/SchemaSerializationException.cs
copy to tests/DotPulsar.Tests/Schemas/TestSamples/JsonModels/PersonModel.cs
index fdb12a0..a52d04c 100644
--- a/src/DotPulsar/Exceptions/SchemaSerializationException.cs
+++ b/tests/DotPulsar.Tests/Schemas/TestSamples/JsonModels/PersonModel.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,9 +12,10 @@
* limitations under the License.
*/
-namespace DotPulsar.Exceptions;
+namespace DotPulsar.Tests.Schemas.TestSamples.JsonModels;
-public sealed class SchemaSerializationException : DotPulsarException
+public sealed class PersonModel
{
- public SchemaSerializationException(string message) : base(message) { }
+ public string Name { get; set; } = string.Empty;
+ public int Age { get; set; }
}