This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-dotnet.git
The following commit(s) were added to refs/heads/main by this push:
new 46dd36f Add serialization helpers for Schema and RecordBatch (#100)
46dd36f is described below
commit 46dd36fef1c079d20eec20f11823577ad9d6223e
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Sun Oct 5 19:09:27 2025 -0700
Add serialization helpers for Schema and RecordBatch (#100)
Adds helpers for Schema and RecordBatch to allow them to be serialized
and deserialized individually. Refactors Flight-related code slightly to
make use of the new helpers.
Closes #31.
---
src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs | 11 +--
src/Apache.Arrow.Flight/FlightInfo.cs | 4 +-
src/Apache.Arrow.Flight/Internal/SchemaWriter.cs | 45 +++--------
.../Server/Internal/FlightServerImplementation.cs | 2 +-
src/Apache.Arrow/ArrowSerializationHelpers.cs | 88 ++++++++++++++++++++++
.../Ipc/ArrowMemoryReaderImplementation.cs | 10 +++
.../Apache.Arrow.Tests/SerializationHelperTests.cs | 63 ++++++++++++++++
7 files changed, 175 insertions(+), 48 deletions(-)
diff --git a/src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs
b/src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs
index 6519881..65b8766 100644
--- a/src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs
+++ b/src/Apache.Arrow.Flight.Sql/SchemaExtensions.cs
@@ -14,8 +14,6 @@
// limitations under the License.
using System;
-using System.IO;
-using Apache.Arrow.Ipc;
namespace Apache.Arrow.Flight.Sql;
@@ -32,8 +30,7 @@ public static class SchemaExtensions
{
throw new ArgumentException("Invalid serialized schema",
nameof(serializedSchema));
}
- using var reader = new ArrowStreamReader(serializedSchema);
- return reader.Schema;
+ return ArrowSerializationHelpers.DeserializeSchema(serializedSchema);
}
/// <summary>
@@ -41,10 +38,6 @@ public static class SchemaExtensions
/// </summary>
public static byte[] SerializeSchema(Schema schema)
{
- using var memoryStream = new MemoryStream();
- using var writer = new ArrowStreamWriter(memoryStream, schema);
- writer.WriteStart();
- writer.WriteEnd();
- return memoryStream.ToArray();
+ return ArrowSerializationHelpers.SerializeSchema(schema);
}
}
diff --git a/src/Apache.Arrow.Flight/FlightInfo.cs
b/src/Apache.Arrow.Flight/FlightInfo.cs
index 1f80ffe..b706262 100644
--- a/src/Apache.Arrow.Flight/FlightInfo.cs
+++ b/src/Apache.Arrow.Flight/FlightInfo.cs
@@ -72,11 +72,9 @@ namespace Apache.Arrow.Flight
internal Protocol.FlightInfo ToProtocol()
{
- var serializedSchema = Schema != null ?
SchemaWriter.SerializeSchema(Schema) : ByteString.Empty;
-
var response = new Protocol.FlightInfo()
{
- Schema = serializedSchema,
+ Schema = Schema.ToByteString(),
FlightDescriptor = Descriptor.ToProtocol(),
TotalBytes = TotalBytes,
TotalRecords = TotalRecords,
diff --git a/src/Apache.Arrow.Flight/Internal/SchemaWriter.cs
b/src/Apache.Arrow.Flight/Internal/SchemaWriter.cs
index b8e38ed..8ca4cee 100644
--- a/src/Apache.Arrow.Flight/Internal/SchemaWriter.cs
+++ b/src/Apache.Arrow.Flight/Internal/SchemaWriter.cs
@@ -13,53 +13,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Apache.Arrow.Flatbuf;
-using Apache.Arrow.Flight.Internal;
-using Apache.Arrow.Ipc;
+using Apache.Arrow.Flight;
using Google.Protobuf;
-namespace Apache.Arrow.Flight.Internal
+namespace Apache.Arrow.Flight
{
- /// <summary>
- /// This class handles writing schemas
- /// </summary>
- internal class SchemaWriter : ArrowStreamWriter
+ internal static class SchemaWriter
{
- internal SchemaWriter(Stream baseStream, Schema schema) :
base(baseStream, schema)
+ public static ByteString ToByteString(Schema schema)
{
- }
-
- public void WriteSchema(Schema schema, CancellationToken
cancellationToken)
- {
- var offset = base.SerializeSchema(schema);
- WriteMessage(MessageHeader.Schema, offset, 0);
- }
-
- public static ByteString SerializeSchema(Schema schema,
CancellationToken cancellationToken = default(CancellationToken))
- {
- using (var memoryStream = new MemoryStream())
- {
- var writer = new SchemaWriter(memoryStream, schema);
- writer.WriteSchema(schema, cancellationToken);
-
- memoryStream.Position = 0;
- return ByteString.FromStream(memoryStream);
- }
+ return schema == null ?
+ ByteString.Empty :
+
UnsafeByteOperations.UnsafeWrap(ArrowSerializationHelpers.SerializeSchema(schema));
}
}
}
public static class SchemaExtension
{
- // Translate an Apache.Arrow.Schema to FlatBuffer Schema to ByteString
+ // This should never have been a public class without a namespace
+ // TODO: Mark as obsolete once sufficient time has passed
public static ByteString ToByteString(this Apache.Arrow.Schema schema)
{
- return SchemaWriter.SerializeSchema(schema);
+ return SchemaWriter.ToByteString(schema);
}
}
diff --git
a/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs
b/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs
index e675cb1..0d3daa2 100644
--- a/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs
+++ b/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs
@@ -70,7 +70,7 @@ namespace Apache.Arrow.Flight.Server.Internal
return new SchemaResult()
{
- Schema = SchemaWriter.SerializeSchema(schema)
+ Schema = schema.ToByteString(),
};
}
diff --git a/src/Apache.Arrow/ArrowSerializationHelpers.cs
b/src/Apache.Arrow/ArrowSerializationHelpers.cs
new file mode 100644
index 0000000..88cab52
--- /dev/null
+++ b/src/Apache.Arrow/ArrowSerializationHelpers.cs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using Apache.Arrow.Flatbuf;
+using Apache.Arrow.Ipc;
+
+namespace Apache.Arrow
+{
+ /// <summary>
+ /// Helpers for serializing partial Arrow structures to and from buffers.
+ /// </summary>
+ public static class ArrowSerializationHelpers
+ {
+ public static Schema DeserializeSchema(ReadOnlyMemory<byte>
serializedSchema)
+ {
+ ArrowMemoryReaderImplementation implementation = new
ArrowMemoryReaderImplementation(serializedSchema, null);
+ return implementation.Schema;
+ }
+
+ public static RecordBatch DeserializeRecordBatch(Schema schema,
ReadOnlyMemory<byte> serializedRecordBatch)
+ {
+ ArrowMemoryReaderImplementation implementation = new
ArrowMemoryReaderImplementation(schema, serializedRecordBatch, null);
+ return implementation.ReadNextRecordBatch();
+ }
+
+ public static byte[] SerializeSchema(Schema schema)
+ {
+ using (var stream = new MemoryStream())
+ {
+ var writer = new SchemaWriter(stream, schema);
+ writer.WriteSchema(schema);
+ return stream.ToArray();
+ }
+ }
+
+ public static byte[] SerializeRecordBatch(RecordBatch recordBatch)
+ {
+ using (var stream = new MemoryStream())
+ {
+ var writer = new SchemaWriter(stream, recordBatch.Schema);
+ writer.WriteBatch(recordBatch);
+ return stream.ToArray();
+ }
+ }
+
+ /// <summary>
+ /// This class handles writing schemas
+ /// </summary>
+ internal class SchemaWriter : ArrowStreamWriter
+ {
+ internal SchemaWriter(Stream baseStream, Schema schema) :
base(baseStream, schema)
+ {
+ }
+
+ public void WriteSchema(Schema schema)
+ {
+ var offset = base.SerializeSchema(schema);
+ WriteMessage(MessageHeader.Schema, offset, 0);
+ }
+
+ public void WriteBatch(RecordBatch recordBatch)
+ {
+ HasWrittenSchema = true; // Avoid serializing the schema
+ WriteRecordBatch(recordBatch);
+ WriteEnd();
+ }
+
+ private protected override void StartingWritingDictionary()
+ {
+ throw new InvalidOperationException("Dictionary batches not
supported");
+ }
+ }
+ }
+}
diff --git a/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
b/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
index 763a5d3..5d619f3 100644
--- a/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
+++ b/src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs
@@ -33,6 +33,16 @@ namespace Apache.Arrow.Ipc
_buffer = buffer;
}
+ public ArrowMemoryReaderImplementation(
+ Schema schema,
+ ReadOnlyMemory<byte> buffer,
+ ICompressionCodecFactory compressionCodecFactory
+ ) : base(null, compressionCodecFactory)
+ {
+ _schema = schema;
+ _buffer = buffer;
+ }
+
public override ValueTask<Schema> ReadSchemaAsync(CancellationToken
cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
diff --git a/test/Apache.Arrow.Tests/SerializationHelperTests.cs
b/test/Apache.Arrow.Tests/SerializationHelperTests.cs
new file mode 100644
index 0000000..92372c6
--- /dev/null
+++ b/test/Apache.Arrow.Tests/SerializationHelperTests.cs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System.IO;
+using Apache.Arrow.Ipc;
+using Xunit;
+
+namespace Apache.Arrow.Tests
+{
+ public class SerializationHelperTests
+ {
+ [Fact]
+ public void SchemaRoundTrip()
+ {
+ RecordBatch originalBatch = TestData.CreateSampleRecordBatch(100);
+ var serialized =
ArrowSerializationHelpers.SerializeSchema(originalBatch.Schema);
+ var deserialized =
ArrowSerializationHelpers.DeserializeSchema(serialized);
+
+ SchemaComparer.Compare(originalBatch.Schema, deserialized);
+ }
+
+ [Fact]
+ public void RecordBatchRoundTrip()
+ {
+ RecordBatch originalBatch = TestData.CreateSampleRecordBatch(100,
createDictionaryArray: false);
+ var serialized =
ArrowSerializationHelpers.SerializeRecordBatch(originalBatch);
+ var deserialized =
ArrowSerializationHelpers.DeserializeRecordBatch(originalBatch.Schema,
serialized);
+
+ ArrowReaderVerifier.CompareBatches(originalBatch, deserialized);
+ }
+
+ [Fact]
+ public void ConcatSchemaAndBatchWrite()
+ {
+ RecordBatch originalBatch = TestData.CreateSampleRecordBatch(100,
createDictionaryArray: false);
+ var schema =
ArrowSerializationHelpers.SerializeSchema(originalBatch.Schema);
+ var serialized =
ArrowSerializationHelpers.SerializeRecordBatch(originalBatch);
+
+ var buffer = new byte[schema.Length + serialized.Length];
+ System.Array.Copy(schema, buffer, schema.Length);
+ System.Array.Copy(serialized, 0, buffer, schema.Length,
serialized.Length);
+
+ using (var stream = new MemoryStream(buffer))
+ using (var reader = new ArrowStreamReader(stream))
+ {
+ var deserialized = reader.ReadNextRecordBatch();
+ ArrowReaderVerifier.CompareBatches(originalBatch,
deserialized);
+ }
+ }
+ }
+}