eerhardt commented on a change in pull request #8694: URL: https://github.com/apache/arrow/pull/8694#discussion_r528917658
########## File path: csharp/src/Apache.Arrow.Flight/FlightInfo.cs ########## @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Text; +using Apache.Arrow.Flight.Internal; +using Apache.Arrow.Ipc; + +namespace Apache.Arrow.Flight +{ + public class FlightInfo + { + internal FlightInfo(Protocol.FlightInfo flightInfo) + { + Schema = FlightMessageSerializer.DecodeSchema(flightInfo.Schema.Memory); + FlightDescriptor = new FlightDescriptor(flightInfo.FlightDescriptor); + + var endpoints = new List<FlightEndpoint>(); + foreach(var endpoint in flightInfo.Endpoint) + { + endpoints.Add(new FlightEndpoint(endpoint)); + } + Endpoints = endpoints; + + TotalBytes = flightInfo.TotalBytes; + TotalRecords = flightInfo.TotalRecords; + } + + public FlightInfo(Schema schema, FlightDescriptor flightDescriptor, IReadOnlyList<FlightEndpoint> flightEndpoints, long totalRecords = 0, long totalBytes = 0) + { + Schema = schema; + FlightDescriptor = flightDescriptor; + Endpoints = flightEndpoints; + TotalBytes = totalBytes; + TotalRecords = totalRecords; + } + + public FlightDescriptor FlightDescriptor { get; } Review comment: I think this property can just be named `Descriptor`. The property name doesn't need a `Flight` prefix. ```suggestion public FlightDescriptor Descriptor { get; } ``` ########## File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs ########## @@ -343,7 +343,7 @@ private protected void WriteRecordBatchInternal(RecordBatch recordBatch) FinishedWritingRecordBatch(bodyLength + bodyPaddingLength, metadataLength); } - private Tuple<ArrowRecordBatchFlatBufferBuilder, VectorOffset> PreparingWritingRecordBatch(RecordBatch recordBatch) + private protected Tuple<ArrowRecordBatchFlatBufferBuilder, VectorOffset> PreparingWritingRecordBatch(RecordBatch recordBatch) Review comment: Is this change necessary? I don't see additional callers of this method. ########## File path: csharp/src/Apache.Arrow.Flight/FlightDescriptor.cs ########## @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Google.Protobuf; + +namespace Apache.Arrow.Flight +{ + public class FlightDescriptor + { + private readonly Protocol.FlightDescriptor _flightDescriptor; + + private FlightDescriptor(ByteString command) + { + _flightDescriptor = new Protocol.FlightDescriptor() + { + Cmd = command, + Type = Protocol.FlightDescriptor.Types.DescriptorType.Cmd + }; + } + + private FlightDescriptor(params string[] paths) + { + _flightDescriptor = new Protocol.FlightDescriptor() + { + Type = Protocol.FlightDescriptor.Types.DescriptorType.Path + }; + + foreach(var path in paths) + { + _flightDescriptor.Path.Add(path); + } + } + + + public static FlightDescriptor Command(byte[] command) + { + return new FlightDescriptor(ByteString.CopyFrom(command)); + } + + public static FlightDescriptor Command(string command) + { + return new FlightDescriptor(ByteString.CopyFromUtf8(command)); + } + + public static FlightDescriptor Path(params string[] paths) + { + return new FlightDescriptor(paths); + } + + + internal FlightDescriptor(Protocol.FlightDescriptor flightDescriptor) + { + if(flightDescriptor.Type != Protocol.FlightDescriptor.Types.DescriptorType.Cmd && flightDescriptor.Type != Protocol.FlightDescriptor.Types.DescriptorType.Path) + { + throw new NotSupportedException(); + } + _flightDescriptor = flightDescriptor; + } + + internal Protocol.FlightDescriptor ToProtocol() + { + return _flightDescriptor; + } + + public FlightDescriptorType Type => (FlightDescriptorType)_flightDescriptor.Type; + + public IEnumerable<string> Paths => _flightDescriptor.Path; + + public ByteString Cmd => _flightDescriptor.Cmd; Review comment: We should spell out the full word. .NET design guidelines say not to use abbreviations in public APIs. https://docs.microsoft.com/en-us/dotnet/standard/design-guidelines/general-naming-conventions ```suggestion public ByteString Command => _flightDescriptor.Cmd; ``` ########## File path: csharp/src/Apache.Arrow.Flight/Client/FlightRecordBatchDuplexStreamingCall.cs ########## @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Threading.Tasks; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Client +{ + public class FlightRecordBatchDuplexStreamingCall : IDisposable + { + private readonly Func<Status> _getStatusFunc; + private readonly Func<Metadata> _getTrailersFunc; + private readonly Action _disposeAction; + + internal FlightRecordBatchDuplexStreamingCall( + FlightClientRecordBatchStreamWriter requestStream, + IAsyncStreamReader<FlightPutResult> responseStream, + Task<Metadata> responseHeadersAsync, + Func<Status> getStatusFunc, + Func<Metadata> getTrailersFunc, + Action disposeAction) + { + RequestStream = requestStream; + ResponseStream = responseStream; + ResponseHeadersAsync = responseHeadersAsync; + _getStatusFunc = getStatusFunc; + _getTrailersFunc = getTrailersFunc; + _disposeAction = disposeAction; + } + + /// <summary> + /// Async stream to read streaming responses. + /// </summary> + public IAsyncStreamReader<FlightPutResult> ResponseStream { get; } + + /// <summary> + /// Async stream to send streaming requests. + /// </summary> + public FlightClientRecordBatchStreamWriter RequestStream { get; } + + /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync { get; } + + /// <summary> + /// Provides means to cleanup after the call. If the call has already finished normally + /// (response stream has been fully read), doesn't do anything. Otherwise, requests + /// cancellation of the call which should terminate all pending async operations + /// associated with the call. As a result, all resources being used by the call should + /// be released eventually. + /// </summary> + /// <remarks> + /// Normally, there is no need for you to dispose the call unless you want to utilize + /// the "Cancel" semantics of invoking Dispose. + /// </remarks> + public void Dispose() + { + _disposeAction(); + } + + // + // Summary: + // Gets the call status if the call has already finished. Throws InvalidOperationException Review comment: How do callers know when the call has finished? Do they `await` on the `ResponseHeadersAsync`? ########## File path: csharp/src/Apache.Arrow.Flight/Internal/FlightMessageSerializer.cs ########## @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Buffers.Binary; +using System.Collections.Generic; +using System.IO; +using System.Text; +using Apache.Arrow.Ipc; +using FlatBuffers; + +namespace Apache.Arrow.Flight +{ + internal static class FlightMessageSerializer + { + public static Schema DecodeSchema(ReadOnlyMemory<byte> buffer) + { + int bufferPosition = 0; + int schemaMessageLength = BinaryPrimitives.ReadInt32LittleEndian(buffer.Span.Slice(bufferPosition)); + bufferPosition += sizeof(int); + + if (schemaMessageLength == MessageSerializer.IpcContinuationToken) + { + // ARROW-6313, if the first 4 bytes are continuation message, read the next 4 for the length + if (buffer.Length <= bufferPosition + sizeof(int)) + { + throw new InvalidDataException("Corrupted IPC message. Received a continuation token at the end of the message."); + } + + schemaMessageLength = BinaryPrimitives.ReadInt32LittleEndian(buffer.Span.Slice(bufferPosition)); + bufferPosition += sizeof(int); + } + + ByteBuffer schemaBuffer = ArrowReaderImplementation.CreateByteBuffer(buffer.Slice(bufferPosition)); + var Schema = MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer)); + return Schema; + } + + internal static Schema DecodeSchema(ByteBuffer schemaBuffer) + { + var Schema = MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer)); + return Schema; Review comment: (nit) variable names should be camelCased. Or just skip the variable altogether. ```suggestion return MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer)); } internal static Schema DecodeSchema(ByteBuffer schemaBuffer) { return MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer)); ``` ########## File path: csharp/src/Apache.Arrow.Flight/Client/FlightClientRecordBatchStreamWriter.cs ########## @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Apache.Arrow.Flight.Protocol; +using Apache.Arrow.Flight.Internal; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Client +{ + public class FlightClientRecordBatchStreamWriter : FlightRecordBatchStreamWriter, IClientStreamWriter<RecordBatch> + { + private readonly IClientStreamWriter<FlightData> _clientStreamWriter; + private bool _completed = false; + internal FlightClientRecordBatchStreamWriter(IClientStreamWriter<FlightData> clientStreamWriter, FlightDescriptor flightDescriptor) : base(clientStreamWriter, flightDescriptor) + { + _clientStreamWriter = clientStreamWriter; + } + + protected override void Dispose(bool disposing) + { + CompleteAsync().Wait(); Review comment: Is this necessary? Having sync-over-async is typically not good. What are the ramifications if someone doesn't call CompleteAsync() before disposing? Should we just `throw` in that case? ########## File path: csharp/src/Apache.Arrow.Flight/Client/FlightRecordBatchDuplexStreamingCall.cs ########## @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Threading.Tasks; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Client +{ + public class FlightRecordBatchDuplexStreamingCall : IDisposable + { + private readonly Func<Status> _getStatusFunc; + private readonly Func<Metadata> _getTrailersFunc; + private readonly Action _disposeAction; + + internal FlightRecordBatchDuplexStreamingCall( + FlightClientRecordBatchStreamWriter requestStream, + IAsyncStreamReader<FlightPutResult> responseStream, + Task<Metadata> responseHeadersAsync, + Func<Status> getStatusFunc, + Func<Metadata> getTrailersFunc, + Action disposeAction) + { + RequestStream = requestStream; + ResponseStream = responseStream; + ResponseHeadersAsync = responseHeadersAsync; + _getStatusFunc = getStatusFunc; + _getTrailersFunc = getTrailersFunc; + _disposeAction = disposeAction; + } + + /// <summary> + /// Async stream to read streaming responses. + /// </summary> + public IAsyncStreamReader<FlightPutResult> ResponseStream { get; } + + /// <summary> + /// Async stream to send streaming requests. + /// </summary> + public FlightClientRecordBatchStreamWriter RequestStream { get; } + + /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync { get; } + + /// <summary> + /// Provides means to cleanup after the call. If the call has already finished normally + /// (response stream has been fully read), doesn't do anything. Otherwise, requests + /// cancellation of the call which should terminate all pending async operations + /// associated with the call. As a result, all resources being used by the call should + /// be released eventually. + /// </summary> + /// <remarks> + /// Normally, there is no need for you to dispose the call unless you want to utilize + /// the "Cancel" semantics of invoking Dispose. + /// </remarks> + public void Dispose() + { + _disposeAction(); + } + + // + // Summary: Review comment: Can these be `xml` comments? ########## File path: csharp/src/Apache.Arrow.Flight/Internal/FlightRecordBatchStreamWriter.cs ########## @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Apache.Arrow.Flight.Protocol; +using Google.Protobuf; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Internal +{ + public abstract class FlightRecordBatchStreamWriter : IAsyncStreamWriter<RecordBatch>, IDisposable + { + private FlightDataStream _flightDataStream; + private readonly IAsyncStreamWriter<FlightData> _clientStreamWriter; + private readonly FlightDescriptor _flightDescriptor; + + private bool _disposed; + + private protected FlightRecordBatchStreamWriter(IAsyncStreamWriter<FlightData> clientStreamWriter, FlightDescriptor flightDescriptor) + { + _clientStreamWriter = clientStreamWriter; + _flightDescriptor = flightDescriptor; + } + + private void SetupStream(Schema schema) + { + _flightDataStream = new FlightDataStream(_clientStreamWriter, _flightDescriptor, schema); + } + + public WriteOptions WriteOptions { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + + public Task WriteAsync(RecordBatch message) + { + return WriteAsync(message, default); + } + + public Task WriteAsync(RecordBatch message, ByteString applicationMetadata) + { + if (_flightDataStream == null) + { + SetupStream(message.Schema); + } + + return _flightDataStream.Write(message, applicationMetadata); + } + + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + _flightDataStream.Dispose(); + _disposed = true; + } + } + + ~FlightRecordBatchStreamWriter() Review comment: We should remove any finalizers. The only reason to use a finalizer is when a class is directly managing native resources. And even then, we try to use SafeHandles when possible. ########## File path: csharp/src/Apache.Arrow.Flight/Internal/FlightRecordBatchStreamReader.cs ########## @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Apache.Arrow.Flatbuf; +using Apache.Arrow.Flight.Protocol; +using Apache.Arrow.Ipc; +using Google.Protobuf; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Internal +{ + /// <summary> + /// Stream of record batches + /// + /// Use MoveNext() and Current to iterate over the batches. + /// There are also gRPC helper functions such as ToListAsync() etc. + /// </summary> + public abstract class FlightRecordBatchStreamReader : IAsyncStreamReader<RecordBatch> Review comment: Can this class also implement `IAsyncEnumerable<RecordBatch>`? Then callers can just directly `await foreach` it, without having to call `ReadAllAsync()`. My understanding of why `IAsyncStreamReader<T>` doesn't implement `IAsyncEnumerable<T>` is because the Grpc interface came first, and it couldn't be changed without a breaking change. @JamesNK - is this the case? ########## File path: csharp/src/Apache.Arrow.Flight/Internal/FlightRecordBatchStreamReader.cs ########## @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Apache.Arrow.Flatbuf; +using Apache.Arrow.Flight.Protocol; +using Apache.Arrow.Ipc; +using Google.Protobuf; +using Grpc.Core; + +namespace Apache.Arrow.Flight.Internal Review comment: Can we move this to the root namespace? I don't really like public types in an `Internal` namespace. It feels like an anti-pattern to me. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
