This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new 46dd36f  Add serialization helpers for Schema and RecordBatch (#100)
46dd36f is described below

commit 46dd36fef1c079d20eec20f11823577ad9d6223e
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Sun Oct 5 19:09:27 2025 -0700

    Add serialization helpers for Schema and RecordBatch (#100)
    
    Adds helpers for Schema and RecordBatch to allow them to be serialized
    and deserialized individually. Refactors Flight-related code slightly to
    make use of the new helpers.
    
    Closes #31.
---
 src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs    | 11 +--
 src/Apache.Arrow.Flight/FlightInfo.cs              |  4 +-
 src/Apache.Arrow.Flight/Internal/SchemaWriter.cs   | 45 +++--------
 .../Server/Internal/FlightServerImplementation.cs  |  2 +-
 src/Apache.Arrow/ArrowSerializationHelpers.cs      | 88 ++++++++++++++++++++++
 .../Ipc/ArrowMemoryReaderImplementation.cs         | 10 +++
 .../Apache.Arrow.Tests/SerializationHelperTests.cs | 63 ++++++++++++++++
 7 files changed, 175 insertions(+), 48 deletions(-)

diff --git a/src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs 
b/src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs
index 6519881..65b8766 100644
--- a/src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs
+++ b/src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs
@@ -14,8 +14,6 @@
 // limitations under the License.
 
 using System;
-using System.IO;
-using Apache.Arrow.Ipc;
 
 namespace Apache.Arrow.Flight.Sql;
 
@@ -32,8 +30,7 @@ public static class SchemaExtensions
         {
             throw new ArgumentException("Invalid serialized schema", 
nameof(serializedSchema));
         }
-        using var reader = new ArrowStreamReader(serializedSchema);
-        return reader.Schema;
+        return ArrowSerializationHelpers.DeserializeSchema(serializedSchema);
     }
 
     /// <summary>
@@ -41,10 +38,6 @@ public static class SchemaExtensions
     /// </summary>
     public static byte[] SerializeSchema(Schema schema)
     {
-        using var memoryStream = new MemoryStream();
-        using var writer = new ArrowStreamWriter(memoryStream, schema);
-        writer.WriteStart();
-        writer.WriteEnd();
-        return memoryStream.ToArray();
+        return ArrowSerializationHelpers.SerializeSchema(schema);
     }
 }
diff --git a/src/Apache.Arrow.Flight/FlightInfo.cs 
b/src/Apache.Arrow.Flight/FlightInfo.cs
index 1f80ffe..b706262 100644
--- a/src/Apache.Arrow.Flight/FlightInfo.cs
+++ b/src/Apache.Arrow.Flight/FlightInfo.cs
@@ -72,11 +72,9 @@ namespace Apache.Arrow.Flight
 
         internal Protocol.FlightInfo ToProtocol()
         {
-            var serializedSchema = Schema != null ? 
SchemaWriter.SerializeSchema(Schema) : ByteString.Empty;
-
             var response = new Protocol.FlightInfo()
             {
-                Schema = serializedSchema,
+                Schema = Schema.ToByteString(),
                 FlightDescriptor = Descriptor.ToProtocol(),
                 TotalBytes = TotalBytes,
                 TotalRecords = TotalRecords,
diff --git a/src/Apache.Arrow.Flight/Internal/SchemaWriter.cs 
b/src/Apache.Arrow.Flight/Internal/SchemaWriter.cs
index b8e38ed..8ca4cee 100644
--- a/src/Apache.Arrow.Flight/Internal/SchemaWriter.cs
+++ b/src/Apache.Arrow.Flight/Internal/SchemaWriter.cs
@@ -13,53 +13,28 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Apache.Arrow.Flatbuf;
-using Apache.Arrow.Flight.Internal;
-using Apache.Arrow.Ipc;
+using Apache.Arrow.Flight;
 using Google.Protobuf;
 
-namespace Apache.Arrow.Flight.Internal
+namespace Apache.Arrow.Flight
 {
-    /// <summary>
-    /// This class handles writing schemas
-    /// </summary>
-    internal class SchemaWriter : ArrowStreamWriter
+    internal static class SchemaWriter
     {
-        internal SchemaWriter(Stream baseStream, Schema schema) : 
base(baseStream, schema)
+        public static ByteString ToByteString(Schema schema)
         {
-        }
-
-        public void WriteSchema(Schema schema, CancellationToken 
cancellationToken)
-        {
-            var offset = base.SerializeSchema(schema);
-            WriteMessage(MessageHeader.Schema, offset, 0);
-        }
-
-        public static ByteString SerializeSchema(Schema schema, 
CancellationToken cancellationToken = default(CancellationToken))
-        {
-            using (var memoryStream = new MemoryStream())
-            {
-                var writer = new SchemaWriter(memoryStream, schema);
-                writer.WriteSchema(schema, cancellationToken);
-
-                memoryStream.Position = 0;
-                return ByteString.FromStream(memoryStream);
-            }
+            return schema == null ?
+                ByteString.Empty :
+                
UnsafeByteOperations.UnsafeWrap(ArrowSerializationHelpers.SerializeSchema(schema));
         }
     }
 }
 
 public static class SchemaExtension
 {
-    // Translate an Apache.Arrow.Schema to FlatBuffer Schema to ByteString
+    // This should never have been a public class without a namespace
+    // TODO: Mark as obsolete once sufficient time has passed
     public static ByteString ToByteString(this Apache.Arrow.Schema schema)
     {
-        return SchemaWriter.SerializeSchema(schema);
+        return SchemaWriter.ToByteString(schema);
     }
 }
diff --git 
a/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs 
b/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs
index e675cb1..0d3daa2 100644
--- a/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs
+++ b/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs
@@ -70,7 +70,7 @@ namespace Apache.Arrow.Flight.Server.Internal
 
             return new SchemaResult()
             {
-                Schema = SchemaWriter.SerializeSchema(schema)
+                Schema = schema.ToByteString(),
             };
         }
 
diff --git a/src/Apache.Arrow/ArrowSerializationHelpers.cs 
b/src/Apache.Arrow/ArrowSerializationHelpers.cs
new file mode 100644
index 0000000..88cab52
--- /dev/null
+++ b/src/Apache.Arrow/ArrowSerializationHelpers.cs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using Apache.Arrow.Flatbuf;
+using Apache.Arrow.Ipc;
+
+namespace Apache.Arrow
+{
+    /// <summary>
+    /// Helpers for serializing partial Arrow structures to and from buffers.
+    /// </summary>
+    public static class ArrowSerializationHelpers
+    {
+        public static Schema DeserializeSchema(ReadOnlyMemory<byte> 
serializedSchema)
+        {
+            ArrowMemoryReaderImplementation implementation = new 
ArrowMemoryReaderImplementation(serializedSchema, null);
+            return implementation.Schema;
+        }
+
+        public static RecordBatch DeserializeRecordBatch(Schema schema, 
ReadOnlyMemory<byte> serializedRecordBatch)
+        {
+            ArrowMemoryReaderImplementation implementation = new 
ArrowMemoryReaderImplementation(schema, serializedRecordBatch, null);
+            return implementation.ReadNextRecordBatch();
+        }
+
+        public static byte[] SerializeSchema(Schema schema)
+        {
+            using (var stream = new MemoryStream())
+            {
+                var writer = new SchemaWriter(stream, schema);
+                writer.WriteSchema(schema);
+                return stream.ToArray();
+            }
+        }
+
+        public static byte[] SerializeRecordBatch(RecordBatch recordBatch)
+        {
+            using (var stream = new MemoryStream())
+            {
+                var writer = new SchemaWriter(stream, recordBatch.Schema);
+                writer.WriteBatch(recordBatch);
+                return stream.ToArray();
+            }
+        }
+
+        /// <summary>
+        /// This class handles writing schemas
+        /// </summary>
+        internal class SchemaWriter : ArrowStreamWriter
+        {
+            internal SchemaWriter(Stream baseStream, Schema schema) : 
base(baseStream, schema)
+            {
+            }
+
+            public void WriteSchema(Schema schema)
+            {
+                var offset = base.SerializeSchema(schema);
+                WriteMessage(MessageHeader.Schema, offset, 0);
+            }
+
+            public void WriteBatch(RecordBatch recordBatch)
+            {
+                HasWrittenSchema = true; // Avoid serializing the schema
+                WriteRecordBatch(recordBatch);
+                WriteEnd();
+            }
+
+            private protected override void StartingWritingDictionary()
+            {
+                throw new InvalidOperationException("Dictionary batches not 
supported");
+            }
+        }
+    }
+}
diff --git a/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs 
b/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
index 763a5d3..5d619f3 100644
--- a/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
+++ b/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
@@ -33,6 +33,16 @@ namespace Apache.Arrow.Ipc
             _buffer = buffer;
         }
 
+        public ArrowMemoryReaderImplementation(
+            Schema schema,
+            ReadOnlyMemory<byte> buffer,
+            ICompressionCodecFactory compressionCodecFactory
+        ) : base(null, compressionCodecFactory)
+        {
+            _schema = schema;
+            _buffer = buffer;
+        }
+
         public override ValueTask<Schema> ReadSchemaAsync(CancellationToken 
cancellationToken)
         {
             cancellationToken.ThrowIfCancellationRequested();
diff --git a/test/Apache.Arrow.Tests/SerializationHelperTests.cs 
b/test/Apache.Arrow.Tests/SerializationHelperTests.cs
new file mode 100644
index 0000000..92372c6
--- /dev/null
+++ b/test/Apache.Arrow.Tests/SerializationHelperTests.cs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System.IO;
+using Apache.Arrow.Ipc;
+using Xunit;
+
+namespace Apache.Arrow.Tests
+{
+    public class SerializationHelperTests
+    {
+        [Fact]
+        public void SchemaRoundTrip()
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(100);
+            var serialized = 
ArrowSerializationHelpers.SerializeSchema(originalBatch.Schema);
+            var deserialized = 
ArrowSerializationHelpers.DeserializeSchema(serialized);
+
+            SchemaComparer.Compare(originalBatch.Schema, deserialized);
+        }
+
+        [Fact]
+        public void RecordBatchRoundTrip()
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(100, 
createDictionaryArray: false);
+            var serialized = 
ArrowSerializationHelpers.SerializeRecordBatch(originalBatch);
+            var deserialized = 
ArrowSerializationHelpers.DeserializeRecordBatch(originalBatch.Schema, 
serialized);
+
+            ArrowReaderVerifier.CompareBatches(originalBatch, deserialized);
+        }
+
+        [Fact]
+        public void ConcatSchemaAndBatchWrite()
+        {
+            RecordBatch originalBatch = TestData.CreateSampleRecordBatch(100, 
createDictionaryArray: false);
+            var schema = 
ArrowSerializationHelpers.SerializeSchema(originalBatch.Schema);
+            var serialized = 
ArrowSerializationHelpers.SerializeRecordBatch(originalBatch);
+
+            var buffer = new byte[schema.Length + serialized.Length];
+            System.Array.Copy(schema, buffer, schema.Length);
+            System.Array.Copy(serialized, 0, buffer, schema.Length, 
serialized.Length);
+
+            using (var stream = new MemoryStream(buffer))
+            using (var reader = new ArrowStreamReader(stream))
+            {
+                var deserialized = reader.ReadNextRecordBatch();
+                ArrowReaderVerifier.CompareBatches(originalBatch, 
deserialized);
+            }
+        }
+    }
+}

Reply via email to