This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e01e5c  Add JSON and Protobuf native schema support (#279)
9e01e5c is described below

commit 9e01e5c155deb0ee49353ccc800385f9f5f34e82
Author: Thorben Luepkes <[email protected]>
AuthorDate: Wed Feb 11 11:01:34 2026 +0100

    Add JSON and Protobuf native schema support (#279)
    
    Implement ISchema<T> for JSON and Protobuf message types,
    closing the gap with Java and Go Pulsar clients.
    
    - JsonSchema<T>: uses System.Text.Json for serialization with
      optional JsonSerializerOptions and schema definition string
    - ProtobufSchema<T>: uses Google.Protobuf (already a dependency)
      with native protobuf descriptor for schema data
    - Add factory methods Schema.Json<T>() and Schema.Protobuf<T>()
    - Add inner exception constructor to SchemaSerializationException
    - Add System.Text.Json dependency for netstandard2.0/2.1 targets
    - Include 21 unit tests covering round-trip, edge cases, and
      error handling
    
    Fix #254
---
 src/DotPulsar/DotPulsar.csproj                     |   2 +
 .../Exceptions/SchemaSerializationException.cs     |   2 +
 src/DotPulsar/Schema.cs                            |  20 +++
 src/DotPulsar/Schemas/JsonSchema.cs                |  71 ++++++++
 src/DotPulsar/Schemas/ProtobufSchema.cs            |  57 +++++++
 tests/DotPulsar.Tests/Schemas/JsonSchemaTests.cs   | 185 +++++++++++++++++++++
 .../DotPulsar.Tests/Schemas/ProtobufSchemaTests.cs | 162 ++++++++++++++++++
 .../Schemas/TestSamples/JsonModels/PersonModel.cs  |   9 +-
 8 files changed, 504 insertions(+), 4 deletions(-)

diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 67e1b7a..80a2678 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -38,6 +38,7 @@
     <PackageReference Include="System.Collections.Immutable" Version="10.0.2" 
/>
     <PackageReference Include="System.Diagnostics.DiagnosticSource" 
Version="10.0.2" />
     <PackageReference Include="System.IO.Pipelines" Version="10.0.2" />
+    <PackageReference Include="System.Text.Json" Version="10.0.2" />
   </ItemGroup>
 
   <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
@@ -45,6 +46,7 @@
     <PackageReference Include="System.Collections.Immutable" Version="10.0.2" 
/>
     <PackageReference Include="System.Diagnostics.DiagnosticSource" 
Version="10.0.2" />
     <PackageReference Include="System.IO.Pipelines" Version="10.0.2" />
+    <PackageReference Include="System.Text.Json" Version="10.0.2" />
   </ItemGroup>
 
   <ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
diff --git a/src/DotPulsar/Exceptions/SchemaSerializationException.cs 
b/src/DotPulsar/Exceptions/SchemaSerializationException.cs
index fdb12a0..877151a 100644
--- a/src/DotPulsar/Exceptions/SchemaSerializationException.cs
+++ b/src/DotPulsar/Exceptions/SchemaSerializationException.cs
@@ -17,4 +17,6 @@ namespace DotPulsar.Exceptions;
 public sealed class SchemaSerializationException : DotPulsarException
 {
     public SchemaSerializationException(string message) : base(message) { }
+
+    public SchemaSerializationException(string message, Exception 
innerException) : base(message, innerException) { }
 }
diff --git a/src/DotPulsar/Schema.cs b/src/DotPulsar/Schema.cs
index 6b6e750..6280dd8 100644
--- a/src/DotPulsar/Schema.cs
+++ b/src/DotPulsar/Schema.cs
@@ -97,6 +97,26 @@ public static class Schema
     /// </summary>
     public static TimeSchema Time { get; }
 
+    /// <summary>
+    /// JSON schema using System.Text.Json
+    /// </summary>
+    public static JsonSchema<T> Json<T>() => new();
+
+    /// <summary>
+    /// JSON schema using System.Text.Json with custom serializer options
+    /// </summary>
+    public static JsonSchema<T> Json<T>(System.Text.Json.JsonSerializerOptions 
options) => new(options);
+
+    /// <summary>
+    /// JSON schema using System.Text.Json with custom serializer options and 
schema definition
+    /// </summary>
+    public static JsonSchema<T> Json<T>(System.Text.Json.JsonSerializerOptions 
options, string jsonSchemaDefinition) => new(options, jsonSchemaDefinition);
+
+    /// <summary>
+    /// Protobuf native schema for classes that implement 
Google.Protobuf.IMessage
+    /// </summary>
+    public static ProtobufSchema<T> Protobuf<T>() where T : 
Google.Protobuf.IMessage<T>, new() => new();
+
     /// <summary>
     /// Avro schema for classes that use ISpecificRecord
     /// </summary>
diff --git a/src/DotPulsar/Schemas/JsonSchema.cs 
b/src/DotPulsar/Schemas/JsonSchema.cs
new file mode 100644
index 0000000..45f1411
--- /dev/null
+++ b/src/DotPulsar/Schemas/JsonSchema.cs
@@ -0,0 +1,71 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+using System.Buffers;
+using System.Collections.Immutable;
+using System.Text;
+using System.Text.Json;
+
+/// <summary>
+/// Schema definition for JSON encoded messages.
+/// </summary>
+public sealed class JsonSchema<T> : ISchema<T>
+{
+    private readonly JsonSerializerOptions _options;
+
+    public JsonSchema()
+    {
+        _options = new JsonSerializerOptions();
+        SchemaInfo = new SchemaInfo(typeof(T).Name, Array.Empty<byte>(), 
SchemaType.Json, ImmutableDictionary<string, string>.Empty);
+    }
+
+    public JsonSchema(JsonSerializerOptions options)
+    {
+        _options = options;
+        SchemaInfo = new SchemaInfo(typeof(T).Name, Array.Empty<byte>(), 
SchemaType.Json, ImmutableDictionary<string, string>.Empty);
+    }
+
+    public JsonSchema(JsonSerializerOptions options, string 
jsonSchemaDefinition)
+    {
+        _options = options;
+        var schemaData = Encoding.UTF8.GetBytes(jsonSchemaDefinition);
+        SchemaInfo = new SchemaInfo(typeof(T).Name, schemaData, 
SchemaType.Json, ImmutableDictionary<string, string>.Empty);
+    }
+
+    public SchemaInfo SchemaInfo { get; }
+
+    public T Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+    {
+        try
+        {
+            var array = bytes.ToArray();
+            return JsonSerializer.Deserialize<T>(array, _options)
+                ?? throw new SchemaSerializationException($"Failed to 
deserialize JSON to type '{typeof(T).Name}'");
+        }
+        catch (JsonException exception)
+        {
+            throw new SchemaSerializationException($"Failed to deserialize 
JSON to type '{typeof(T).Name}'", exception);
+        }
+    }
+
+    public ReadOnlySequence<byte> Encode(T message)
+    {
+        var bytes = JsonSerializer.SerializeToUtf8Bytes(message, _options);
+        return new ReadOnlySequence<byte>(bytes);
+    }
+}
diff --git a/src/DotPulsar/Schemas/ProtobufSchema.cs 
b/src/DotPulsar/Schemas/ProtobufSchema.cs
new file mode 100644
index 0000000..e617aab
--- /dev/null
+++ b/src/DotPulsar/Schemas/ProtobufSchema.cs
@@ -0,0 +1,57 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Schemas;
+
+using DotPulsar.Abstractions;
+using Google.Protobuf;
+using System.Buffers;
+using System.Collections.Immutable;
+
+/// <summary>
+/// Schema definition for Protobuf encoded messages.
+/// </summary>
+public sealed class ProtobufSchema<T> : ISchema<T> where T : 
Google.Protobuf.IMessage<T>, new()
+{
+    public ProtobufSchema()
+    {
+        var instance = new T();
+        var name = instance.Descriptor.Name;
+        var data = instance.Descriptor.File.SerializedData.ToByteArray();
+
+        SchemaInfo = new SchemaInfo(name, data, SchemaType.ProtobufNative, 
ImmutableDictionary<string, string>.Empty);
+    }
+
+    public SchemaInfo SchemaInfo { get; }
+
+    public T Decode(ReadOnlySequence<byte> bytes, byte[]? schemaVersion = null)
+    {
+        try
+        {
+            var message = new T();
+            message.MergeFrom(bytes.ToArray());
+            return message;
+        }
+        catch (InvalidProtocolBufferException exception)
+        {
+            throw new Exceptions.SchemaSerializationException($"Failed to 
decode Protobuf message of type '{typeof(T).Name}'", exception);
+        }
+    }
+
+    public ReadOnlySequence<byte> Encode(T message)
+    {
+        var bytes = message.ToByteArray();
+        return new ReadOnlySequence<byte>(bytes);
+    }
+}
diff --git a/tests/DotPulsar.Tests/Schemas/JsonSchemaTests.cs 
b/tests/DotPulsar.Tests/Schemas/JsonSchemaTests.cs
new file mode 100644
index 0000000..5e30ee7
--- /dev/null
+++ b/tests/DotPulsar.Tests/Schemas/JsonSchemaTests.cs
@@ -0,0 +1,185 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas;
+
+using DotPulsar.Schemas;
+using DotPulsar.Tests.Schemas.TestSamples.JsonModels;
+using System.Buffers;
+using System.Text.Json;
+
+[Trait("Category", "Unit")]
+public sealed class JsonSchemaTests
+{
+    [Fact]
+    public void 
Constructor_GivenNoArguments_ShouldCreateSchemaWithDefaultOptions()
+    {
+        //Act
+        var schema = new JsonSchema<PersonModel>();
+
+        //Assert
+        schema.SchemaInfo.ShouldNotBeNull();
+    }
+
+    [Fact]
+    public void 
Constructor_GivenCustomOptions_ShouldCreateSchemaWithCustomOptions()
+    {
+        //Arrange
+        var options = new JsonSerializerOptions { PropertyNamingPolicy = 
JsonNamingPolicy.CamelCase };
+
+        //Act
+        var schema = new JsonSchema<PersonModel>(options);
+
+        //Assert
+        schema.SchemaInfo.ShouldNotBeNull();
+    }
+
+    [Fact]
+    public void SchemaInfo_ShouldHaveJsonSchemaType()
+    {
+        //Arrange
+        var schema = new JsonSchema<PersonModel>();
+
+        //Act
+        var schemaInfo = schema.SchemaInfo;
+
+        //Assert
+        schemaInfo.Type.ShouldBe(SchemaType.Json);
+    }
+
+    [Fact]
+    public void SchemaInfo_ShouldHaveCorrectName()
+    {
+        //Arrange
+        var schema = new JsonSchema<PersonModel>();
+
+        //Act
+        var schemaInfo = schema.SchemaInfo;
+
+        //Assert
+        schemaInfo.Name.ShouldBe("PersonModel");
+    }
+
+    [Fact]
+    public void Encode_GivenValidModel_ShouldReturnJsonBytes()
+    {
+        //Arrange
+        var schema = new JsonSchema<PersonModel>();
+        var model = new PersonModel { Name = "Alice", Age = 30 };
+
+        //Act
+        var bytes = schema.Encode(model);
+
+        //Assert
+        bytes.Length.ShouldBeGreaterThan(0);
+    }
+
+    [Fact]
+    public void Decode_GivenValidJsonBytes_ShouldReturnCorrectModel()
+    {
+        //Arrange
+        var schema = new JsonSchema<PersonModel>();
+        var expected = new PersonModel { Name = "Bob", Age = 25 };
+        var json = JsonSerializer.SerializeToUtf8Bytes(expected);
+        var bytes = new ReadOnlySequence<byte>(json);
+
+        //Act
+        var actual = schema.Decode(bytes);
+
+        //Assert
+        actual.Name.ShouldBe(expected.Name);
+        actual.Age.ShouldBe(expected.Age);
+    }
+
+    [Fact]
+    public void EncodeDecode_GivenValidModel_ShouldRoundTrip()
+    {
+        //Arrange
+        var schema = new JsonSchema<PersonModel>();
+        var expected = new PersonModel { Name = "Charlie", Age = 42 };
+
+        //Act
+        var bytes = schema.Encode(expected);
+        var actual = schema.Decode(bytes);
+
+        //Assert
+        actual.Name.ShouldBe(expected.Name);
+        actual.Age.ShouldBe(expected.Age);
+    }
+
+    [Fact]
+    public void EncodeDecode_GivenCamelCaseOptions_ShouldRoundTrip()
+    {
+        //Arrange
+        var options = new JsonSerializerOptions { PropertyNamingPolicy = 
JsonNamingPolicy.CamelCase };
+        var schema = new JsonSchema<PersonModel>(options);
+        var expected = new PersonModel { Name = "Diana", Age = 35 };
+
+        //Act
+        var bytes = schema.Encode(expected);
+        var actual = schema.Decode(bytes);
+
+        //Assert
+        actual.Name.ShouldBe(expected.Name);
+        actual.Age.ShouldBe(expected.Age);
+    }
+
+    [Fact]
+    public void Encode_GivenCamelCaseOptions_ShouldProduceCamelCaseJson()
+    {
+        //Arrange
+        var options = new JsonSerializerOptions { PropertyNamingPolicy = 
JsonNamingPolicy.CamelCase };
+        var schema = new JsonSchema<PersonModel>(options);
+        var model = new PersonModel { Name = "Eve", Age = 28 };
+
+        //Act
+        var bytes = schema.Encode(model);
+        var json = System.Text.Encoding.UTF8.GetString(bytes.ToArray());
+
+        //Assert
+        json.ShouldContain("\"name\"", Case.Sensitive);
+        json.ShouldContain("\"age\"", Case.Sensitive);
+        json.ShouldNotContain("\"Name\"", Case.Sensitive);
+        json.ShouldNotContain("\"Age\"", Case.Sensitive);
+    }
+
+    [Fact]
+    public void Constructor_GivenSchemaDefinition_ShouldUseItAsSchemaData()
+    {
+        //Arrange
+        var definition = "{\"type\":\"record\",\"name\":\"PersonModel\"}";
+        var options = new JsonSerializerOptions();
+
+        //Act
+        var schema = new JsonSchema<PersonModel>(options, definition);
+
+        //Assert
+        var schemaData = 
System.Text.Encoding.UTF8.GetString(schema.SchemaInfo.Data);
+        schemaData.ShouldBe(definition);
+    }
+
+    [Fact]
+    public void 
Decode_GivenEmptyBytes_ShouldThrowSchemaSerializationException()
+    {
+        //Arrange
+        var schema = new JsonSchema<PersonModel>();
+        var bytes = new ReadOnlySequence<byte>(Array.Empty<byte>());
+
+        //Act
+        var exception = Record.Exception(() => schema.Decode(bytes));
+
+        //Assert
+        
exception.ShouldBeOfType<DotPulsar.Exceptions.SchemaSerializationException>();
+    }
+}
diff --git a/tests/DotPulsar.Tests/Schemas/ProtobufSchemaTests.cs 
b/tests/DotPulsar.Tests/Schemas/ProtobufSchemaTests.cs
new file mode 100644
index 0000000..2490cd4
--- /dev/null
+++ b/tests/DotPulsar.Tests/Schemas/ProtobufSchemaTests.cs
@@ -0,0 +1,162 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Schemas;
+
+using DotPulsar.Schemas;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+using System.Buffers;
+
+[Trait("Category", "Unit")]
+public sealed class ProtobufSchemaTests
+{
+    [Fact]
+    public void Constructor_GivenValidProtobufMessage_ShouldCreateSchema()
+    {
+        //Act
+        var schema = new ProtobufSchema<StringValue>();
+
+        //Assert
+        schema.SchemaInfo.ShouldNotBeNull();
+    }
+
+    [Fact]
+    public void SchemaInfo_ShouldHaveProtobufNativeSchemaType()
+    {
+        //Arrange
+        var schema = new ProtobufSchema<StringValue>();
+
+        //Act
+        var schemaInfo = schema.SchemaInfo;
+
+        //Assert
+        schemaInfo.Type.ShouldBe(SchemaType.ProtobufNative);
+    }
+
+    [Fact]
+    public void SchemaInfo_ShouldHaveCorrectName()
+    {
+        //Arrange
+        var schema = new ProtobufSchema<StringValue>();
+
+        //Act
+        var schemaInfo = schema.SchemaInfo;
+
+        //Assert
+        schemaInfo.Name.ShouldBe("StringValue");
+    }
+
+    [Fact]
+    public void SchemaInfo_ShouldHaveNonEmptySchemaData()
+    {
+        //Arrange
+        var schema = new ProtobufSchema<StringValue>();
+
+        //Act
+        var schemaInfo = schema.SchemaInfo;
+
+        //Assert
+        schemaInfo.Data.Length.ShouldBeGreaterThan(0);
+    }
+
+    [Fact]
+    public void Encode_GivenValidMessage_ShouldReturnBytes()
+    {
+        //Arrange
+        var schema = new ProtobufSchema<StringValue>();
+        var message = new StringValue { Value = "Hello Pulsar" };
+
+        //Act
+        var bytes = schema.Encode(message);
+
+        //Assert
+        bytes.Length.ShouldBeGreaterThan(0);
+    }
+
+    [Fact]
+    public void Decode_GivenValidBytes_ShouldReturnCorrectMessage()
+    {
+        //Arrange
+        var schema = new ProtobufSchema<StringValue>();
+        var expected = new StringValue { Value = "Hello Pulsar" };
+        var protobufBytes = expected.ToByteArray();
+        var bytes = new ReadOnlySequence<byte>(protobufBytes);
+
+        //Act
+        var actual = schema.Decode(bytes);
+
+        //Assert
+        actual.Value.ShouldBe(expected.Value);
+    }
+
+    [Fact]
+    public void EncodeDecode_GivenValidMessage_ShouldRoundTrip()
+    {
+        //Arrange
+        var schema = new ProtobufSchema<StringValue>();
+        var expected = new StringValue { Value = "Round trip test" };
+
+        //Act
+        var bytes = schema.Encode(expected);
+        var actual = schema.Decode(bytes);
+
+        //Assert
+        actual.Value.ShouldBe(expected.Value);
+    }
+
+    [Fact]
+    public void EncodeDecode_GivenInt32Value_ShouldRoundTrip()
+    {
+        //Arrange
+        var schema = new ProtobufSchema<Int32Value>();
+        var expected = new Int32Value { Value = 42 };
+
+        //Act
+        var bytes = schema.Encode(expected);
+        var actual = schema.Decode(bytes);
+
+        //Assert
+        actual.Value.ShouldBe(expected.Value);
+    }
+
+    [Fact]
+    public void EncodeDecode_GivenEmptyMessage_ShouldRoundTrip()
+    {
+        //Arrange
+        var schema = new ProtobufSchema<StringValue>();
+        var expected = new StringValue { Value = "" };
+
+        //Act
+        var bytes = schema.Encode(expected);
+        var actual = schema.Decode(bytes);
+
+        //Assert
+        actual.Value.ShouldBe(expected.Value);
+    }
+
+    [Fact]
+    public void Decode_GivenEmptyBytes_ShouldReturnDefaultMessage()
+    {
+        //Arrange
+        var schema = new ProtobufSchema<StringValue>();
+        var bytes = new ReadOnlySequence<byte>(Array.Empty<byte>());
+
+        //Act
+        var actual = schema.Decode(bytes);
+
+        //Assert
+        actual.Value.ShouldBe("");
+    }
+}
diff --git a/src/DotPulsar/Exceptions/SchemaSerializationException.cs 
b/tests/DotPulsar.Tests/Schemas/TestSamples/JsonModels/PersonModel.cs
similarity index 75%
copy from src/DotPulsar/Exceptions/SchemaSerializationException.cs
copy to tests/DotPulsar.Tests/Schemas/TestSamples/JsonModels/PersonModel.cs
index fdb12a0..a52d04c 100644
--- a/src/DotPulsar/Exceptions/SchemaSerializationException.cs
+++ b/tests/DotPulsar.Tests/Schemas/TestSamples/JsonModels/PersonModel.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -12,9 +12,10 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Exceptions;
+namespace DotPulsar.Tests.Schemas.TestSamples.JsonModels;
 
-public sealed class SchemaSerializationException : DotPulsarException
+public sealed class PersonModel
 {
-    public SchemaSerializationException(string message) : base(message) { }
+    public string Name { get; set; } = string.Empty;
+    public int Age { get; set; }
 }

Reply via email to