eerhardt commented on a change in pull request #8694:
URL: https://github.com/apache/arrow/pull/8694#discussion_r528917658



##########
File path: csharp/src/Apache.Arrow.Flight/FlightInfo.cs
##########
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Apache.Arrow.Flight.Internal;
+using Apache.Arrow.Ipc;
+
+namespace Apache.Arrow.Flight
+{
+    public class FlightInfo
+    {
+        internal FlightInfo(Protocol.FlightInfo flightInfo)
+        {
+            Schema = 
FlightMessageSerializer.DecodeSchema(flightInfo.Schema.Memory);
+            FlightDescriptor = new 
FlightDescriptor(flightInfo.FlightDescriptor);
+
+            var endpoints = new List<FlightEndpoint>();
+            foreach(var endpoint in flightInfo.Endpoint)
+            {
+                endpoints.Add(new FlightEndpoint(endpoint));
+            }
+            Endpoints = endpoints;
+
+            TotalBytes = flightInfo.TotalBytes;
+            TotalRecords = flightInfo.TotalRecords;
+        }
+
+        public FlightInfo(Schema schema, FlightDescriptor flightDescriptor, 
IReadOnlyList<FlightEndpoint> flightEndpoints, long totalRecords = 0, long 
totalBytes = 0)
+        {
+            Schema = schema;
+            FlightDescriptor = flightDescriptor;
+            Endpoints = flightEndpoints;
+            TotalBytes = totalBytes;
+            TotalRecords = totalRecords;
+        }
+
+        public FlightDescriptor FlightDescriptor { get; }

Review comment:
       I think this property can just be named `Descriptor`. The property name 
doesn't need a `Flight` prefix.
   ```suggestion
           public FlightDescriptor Descriptor { get; }
   ```

##########
File path: csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
##########
@@ -343,7 +343,7 @@ private protected void WriteRecordBatchInternal(RecordBatch 
recordBatch)
             FinishedWritingRecordBatch(bodyLength + bodyPaddingLength, 
metadataLength);
         }
 
-        private Tuple<ArrowRecordBatchFlatBufferBuilder, VectorOffset> 
PreparingWritingRecordBatch(RecordBatch recordBatch)
+        private protected Tuple<ArrowRecordBatchFlatBufferBuilder, 
VectorOffset> PreparingWritingRecordBatch(RecordBatch recordBatch)

Review comment:
       Is this change necessary? I don't see additional callers of this method.

##########
File path: csharp/src/Apache.Arrow.Flight/FlightDescriptor.cs
##########
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Google.Protobuf;
+
+namespace Apache.Arrow.Flight
+{
+    public class FlightDescriptor
+    {
+        private readonly Protocol.FlightDescriptor _flightDescriptor;
+
+        private FlightDescriptor(ByteString command)
+        {
+            _flightDescriptor = new Protocol.FlightDescriptor()
+            {
+                Cmd = command,
+                Type = Protocol.FlightDescriptor.Types.DescriptorType.Cmd
+            };
+        }
+
+        private FlightDescriptor(params string[] paths)
+        {
+            _flightDescriptor = new Protocol.FlightDescriptor()
+            {
+                Type = Protocol.FlightDescriptor.Types.DescriptorType.Path
+            };
+
+            foreach(var path in paths)
+            {
+                _flightDescriptor.Path.Add(path);
+            }
+        }
+
+
+        public static FlightDescriptor Command(byte[] command)
+        {
+            return new FlightDescriptor(ByteString.CopyFrom(command));
+        }
+
+        public static FlightDescriptor Command(string command)
+        {
+            return new FlightDescriptor(ByteString.CopyFromUtf8(command));
+        }
+
+        public static FlightDescriptor Path(params string[] paths)
+        {
+            return new FlightDescriptor(paths);
+        }
+
+
+        internal FlightDescriptor(Protocol.FlightDescriptor flightDescriptor)
+        {
+            if(flightDescriptor.Type != 
Protocol.FlightDescriptor.Types.DescriptorType.Cmd && flightDescriptor.Type != 
Protocol.FlightDescriptor.Types.DescriptorType.Path)
+            {
+                throw new NotSupportedException();
+            }
+            _flightDescriptor = flightDescriptor;
+        }
+
+        internal Protocol.FlightDescriptor ToProtocol()
+        {
+            return _flightDescriptor;
+        }
+
+        public FlightDescriptorType Type => 
(FlightDescriptorType)_flightDescriptor.Type;
+
+        public IEnumerable<string> Paths => _flightDescriptor.Path;
+
+        public ByteString Cmd => _flightDescriptor.Cmd;

Review comment:
       We should spell out the full word. .NET design guidelines say not to use 
abbreviations in public APIs.
   
   
https://docs.microsoft.com/en-us/dotnet/standard/design-guidelines/general-naming-conventions
   
   ```suggestion
           public ByteString Command => _flightDescriptor.Cmd;
   ```
   

##########
File path: 
csharp/src/Apache.Arrow.Flight/Client/FlightRecordBatchDuplexStreamingCall.cs
##########
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Threading.Tasks;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Client
+{
+    public class FlightRecordBatchDuplexStreamingCall : IDisposable
+    {
+        private readonly Func<Status> _getStatusFunc;
+        private readonly Func<Metadata> _getTrailersFunc;
+        private readonly Action _disposeAction;
+
+        internal FlightRecordBatchDuplexStreamingCall(
+            FlightClientRecordBatchStreamWriter requestStream,
+            IAsyncStreamReader<FlightPutResult> responseStream,
+            Task<Metadata> responseHeadersAsync,
+            Func<Status> getStatusFunc,
+            Func<Metadata> getTrailersFunc,
+            Action disposeAction)
+        {
+            RequestStream = requestStream;
+            ResponseStream = responseStream;
+            ResponseHeadersAsync = responseHeadersAsync;
+            _getStatusFunc = getStatusFunc;
+            _getTrailersFunc = getTrailersFunc;
+            _disposeAction = disposeAction;
+        }
+
+        /// <summary>
+        ///  Async stream to read streaming responses.
+        /// </summary>
+        public IAsyncStreamReader<FlightPutResult> ResponseStream { get; }
+
+        /// <summary>
+        /// Async stream to send streaming requests.
+        /// </summary>
+        public FlightClientRecordBatchStreamWriter RequestStream { get; }
+
+        /// <summary>
+        /// Asynchronous access to response headers.
+        /// </summary>
+        public Task<Metadata> ResponseHeadersAsync { get; }
+
+        /// <summary>
+        /// Provides means to cleanup after the call. If the call has already 
finished normally
+        /// (response stream has been fully read), doesn't do anything. 
Otherwise, requests
+        /// cancellation of the call which should terminate all pending async 
operations
+        /// associated with the call. As a result, all resources being used by 
the call should
+        /// be released eventually.
+        /// </summary>
+        /// <remarks>
+        /// Normally, there is no need for you to dispose the call unless you 
want to utilize
+        /// the "Cancel" semantics of invoking Dispose.
+        /// </remarks>
+        public void Dispose()
+        {
+            _disposeAction();
+        }
+
+        //
+        // Summary:
+        //     Gets the call status if the call has already finished. Throws 
InvalidOperationException

Review comment:
       How do callers know when the call has finished? Do they `await` on the 
`ResponseHeadersAsync`?

##########
File path: csharp/src/Apache.Arrow.Flight/Internal/FlightMessageSerializer.cs
##########
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Buffers.Binary;
+using System.Collections.Generic;
+using System.IO;
+using System.Text;
+using Apache.Arrow.Ipc;
+using FlatBuffers;
+
+namespace Apache.Arrow.Flight
+{
+    internal static class FlightMessageSerializer
+    {
+        public static Schema DecodeSchema(ReadOnlyMemory<byte> buffer)
+        {
+            int bufferPosition = 0;
+            int schemaMessageLength = 
BinaryPrimitives.ReadInt32LittleEndian(buffer.Span.Slice(bufferPosition));
+            bufferPosition += sizeof(int);
+
+            if (schemaMessageLength == MessageSerializer.IpcContinuationToken)
+            {
+                // ARROW-6313, if the first 4 bytes are continuation message, 
read the next 4 for the length
+                if (buffer.Length <= bufferPosition + sizeof(int))
+                {
+                    throw new InvalidDataException("Corrupted IPC message. 
Received a continuation token at the end of the message.");
+                }
+
+                schemaMessageLength = 
BinaryPrimitives.ReadInt32LittleEndian(buffer.Span.Slice(bufferPosition));
+                bufferPosition += sizeof(int);
+            }
+
+            ByteBuffer schemaBuffer = 
ArrowReaderImplementation.CreateByteBuffer(buffer.Slice(bufferPosition));
+            var Schema = 
MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer));
+            return Schema;
+        }
+
+        internal static Schema DecodeSchema(ByteBuffer schemaBuffer)
+        {
+            var Schema = 
MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer));
+            return Schema;

Review comment:
       (nit) variable names should be camelCased. Or just skip the variable 
altogether.
   
   ```suggestion
               return 
MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer));
           }
   
           internal static Schema DecodeSchema(ByteBuffer schemaBuffer)
           {
               return 
MessageSerializer.GetSchema(ArrowReaderImplementation.ReadMessage<Flatbuf.Schema>(schemaBuffer));
   ```

##########
File path: 
csharp/src/Apache.Arrow.Flight/Client/FlightClientRecordBatchStreamWriter.cs
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Protocol;
+using Apache.Arrow.Flight.Internal;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Client
+{
+    public class FlightClientRecordBatchStreamWriter : 
FlightRecordBatchStreamWriter, IClientStreamWriter<RecordBatch>
+    {
+        private readonly IClientStreamWriter<FlightData> _clientStreamWriter;
+        private bool _completed = false;
+        internal 
FlightClientRecordBatchStreamWriter(IClientStreamWriter<FlightData> 
clientStreamWriter, FlightDescriptor flightDescriptor) : 
base(clientStreamWriter, flightDescriptor)
+        {
+            _clientStreamWriter = clientStreamWriter;
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            CompleteAsync().Wait();

Review comment:
       Is this necessary? Having sync-over-async is typically not good.
   
   What are the ramifications if someone doesn't call CompleteAsync() before 
disposing? Should we just `throw` in that case?

##########
File path: 
csharp/src/Apache.Arrow.Flight/Client/FlightRecordBatchDuplexStreamingCall.cs
##########
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Threading.Tasks;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Client
+{
+    public class FlightRecordBatchDuplexStreamingCall : IDisposable
+    {
+        private readonly Func<Status> _getStatusFunc;
+        private readonly Func<Metadata> _getTrailersFunc;
+        private readonly Action _disposeAction;
+
+        internal FlightRecordBatchDuplexStreamingCall(
+            FlightClientRecordBatchStreamWriter requestStream,
+            IAsyncStreamReader<FlightPutResult> responseStream,
+            Task<Metadata> responseHeadersAsync,
+            Func<Status> getStatusFunc,
+            Func<Metadata> getTrailersFunc,
+            Action disposeAction)
+        {
+            RequestStream = requestStream;
+            ResponseStream = responseStream;
+            ResponseHeadersAsync = responseHeadersAsync;
+            _getStatusFunc = getStatusFunc;
+            _getTrailersFunc = getTrailersFunc;
+            _disposeAction = disposeAction;
+        }
+
+        /// <summary>
+        ///  Async stream to read streaming responses.
+        /// </summary>
+        public IAsyncStreamReader<FlightPutResult> ResponseStream { get; }
+
+        /// <summary>
+        /// Async stream to send streaming requests.
+        /// </summary>
+        public FlightClientRecordBatchStreamWriter RequestStream { get; }
+
+        /// <summary>
+        /// Asynchronous access to response headers.
+        /// </summary>
+        public Task<Metadata> ResponseHeadersAsync { get; }
+
+        /// <summary>
+        /// Provides means to cleanup after the call. If the call has already 
finished normally
+        /// (response stream has been fully read), doesn't do anything. 
Otherwise, requests
+        /// cancellation of the call which should terminate all pending async 
operations
+        /// associated with the call. As a result, all resources being used by 
the call should
+        /// be released eventually.
+        /// </summary>
+        /// <remarks>
+        /// Normally, there is no need for you to dispose the call unless you 
want to utilize
+        /// the "Cancel" semantics of invoking Dispose.
+        /// </remarks>
+        public void Dispose()
+        {
+            _disposeAction();
+        }
+
+        //
+        // Summary:

Review comment:
       Can these be `xml` comments?

##########
File path: 
csharp/src/Apache.Arrow.Flight/Internal/FlightRecordBatchStreamWriter.cs
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Protocol;
+using Google.Protobuf;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Internal
+{
+    public abstract class FlightRecordBatchStreamWriter : 
IAsyncStreamWriter<RecordBatch>, IDisposable
+    {
+        private FlightDataStream _flightDataStream;
+        private readonly IAsyncStreamWriter<FlightData> _clientStreamWriter;
+        private readonly FlightDescriptor _flightDescriptor;
+
+        private bool _disposed;
+
+        private protected 
FlightRecordBatchStreamWriter(IAsyncStreamWriter<FlightData> 
clientStreamWriter, FlightDescriptor flightDescriptor)
+        {
+            _clientStreamWriter = clientStreamWriter;
+            _flightDescriptor = flightDescriptor;
+        }
+
+        private void SetupStream(Schema schema)
+        {
+            _flightDataStream = new FlightDataStream(_clientStreamWriter, 
_flightDescriptor, schema);
+        }
+
+        public WriteOptions WriteOptions { get => throw new 
NotImplementedException(); set => throw new NotImplementedException(); }
+
+        public Task WriteAsync(RecordBatch message)
+        {
+            return WriteAsync(message, default);
+        }
+
+        public Task WriteAsync(RecordBatch message, ByteString 
applicationMetadata)
+        {
+            if (_flightDataStream == null)
+            {
+                SetupStream(message.Schema);
+            }
+
+            return _flightDataStream.Write(message, applicationMetadata);
+        }
+
+        protected virtual void Dispose(bool disposing)
+        {
+            if (!_disposed)
+            {
+                _flightDataStream.Dispose();
+                _disposed = true;
+            }
+        }
+
+        ~FlightRecordBatchStreamWriter()

Review comment:
       We should remove any finalizers. The only reason to use a finalizer is 
when a class is directly managing native resources. And even then, we try to 
use SafeHandles when possible.

##########
File path: 
csharp/src/Apache.Arrow.Flight/Internal/FlightRecordBatchStreamReader.cs
##########
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flatbuf;
+using Apache.Arrow.Flight.Protocol;
+using Apache.Arrow.Ipc;
+using Google.Protobuf;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Internal
+{
+    /// <summary>
+    /// Stream of record batches
+    ///
+    /// Use MoveNext() and Current to iterate over the batches.
+    /// There are also gRPC helper functions such as ToListAsync() etc.
+    /// </summary>
+    public abstract class FlightRecordBatchStreamReader : 
IAsyncStreamReader<RecordBatch>

Review comment:
       Can this class also implement `IAsyncEnumerable<RecordBatch>`? Then 
callers can just directly `await foreach` it, without having to call 
`ReadAllAsync()`.
   
   My understanding of why `IAsyncStreamReader<T>` doesn't implement 
`IAsyncEnumerable<T>` is because the Grpc interface came first, and it couldn't 
be changed without a breaking change. @JamesNK - is this the case?

##########
File path: 
csharp/src/Apache.Arrow.Flight/Internal/FlightRecordBatchStreamReader.cs
##########
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flatbuf;
+using Apache.Arrow.Flight.Protocol;
+using Apache.Arrow.Ipc;
+using Google.Protobuf;
+using Grpc.Core;
+
+namespace Apache.Arrow.Flight.Internal

Review comment:
       Can we move this to the root namespace? I don't really like public types 
in an `Internal` namespace. It feels like an anti-pattern to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to