westonpace commented on code in PR #36079:
URL: https://github.com/apache/arrow/pull/36079#discussion_r1265474505


##########
csharp/src/Apache.Arrow.Flight/Apache.Arrow.Flight.csproj:
##########
@@ -1,7 +1,7 @@
 <Project Sdk="Microsoft.NET.Sdk">
 
   <PropertyGroup>
-    <TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
+    <TargetFrameworks>netstandard2.0;netstandard2.1;net6.0</TargetFrameworks>

Review Comment:
   Is there a particular need to add `net6.0` here?  Wouldn't `netstandard2.0` 
and `netstandard2.1` be compatible with `net6.0`?



##########
csharp/src/Apache.Arrow.Flight/Client/FlightClient.cs:
##########
@@ -93,6 +93,22 @@ public FlightRecordBatchDuplexStreamingCall 
StartPut(FlightDescriptor flightDesc
                 channels.Dispose);
         }
 
+        public AsyncDuplexStreamingCall<FlightHandshakeRequest, 
FlightHandshakeResponse> Handshake(Metadata headers = null)

Review Comment:
   I don't know much about flight but why does this method return a async call 
when the underlying client method (`Handshake`) is synchronous?



##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlProducerTests.cs:
##########
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Server;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Google.Protobuf.Reflection;
+using Grpc.Core;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlProducerTests
+{
+    [Theory]
+    [InlineData(FlightDescriptorType.Path, null, null)]
+    [InlineData(FlightDescriptorType.Command, "", null)]
+    [InlineData(FlightDescriptorType.Command, 
"CkB0eXBlLmdvb2dsZWFwaXMuY29tL2Fycm93LmZsaWdodC5wcm90b2NvbC5zcWwuQ29tbWFuZEdldENhdGFsb2dz",
 typeof(CommandGetCatalogs))]

Review Comment:
   Is this a binary protobuf message?  it seems like this might be something 
difficult to maintain.



##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlProducerTests.cs:
##########
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Server;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Google.Protobuf.Reflection;
+using Grpc.Core;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlProducerTests
+{
+    [Theory]
+    [InlineData(FlightDescriptorType.Path, null, null)]
+    [InlineData(FlightDescriptorType.Command, "", null)]
+    [InlineData(FlightDescriptorType.Command, 
"CkB0eXBlLmdvb2dsZWFwaXMuY29tL2Fycm93LmZsaWdodC5wcm90b2NvbC5zcWwuQ29tbWFuZEdldENhdGFsb2dz",
 typeof(CommandGetCatalogs))]
+    public void EnsureGetCommandReturnsTheCorrectResponse(FlightDescriptorType 
type, string? command, Type? expectedResult)
+    {
+        //Given
+        FlightDescriptor descriptor;
+        if (type == FlightDescriptorType.Command)
+        {
+            descriptor = command != null ? 
FlightDescriptor.CreateCommandDescriptor(ByteString.FromBase64(command).ToByteArray())
 : FlightDescriptor.CreateCommandDescriptor(ByteString.Empty.ToStringUtf8());
+        }
+        else
+        {
+            descriptor = 
FlightDescriptor.CreatePathDescriptor(System.Array.Empty<string>());
+        }
+
+        //When
+        var result = FlightSqlProducer.GetCommand(descriptor);
+
+        //Then
+        Assert.Equal(expectedResult, result?.GetType());
+    }
+
+    [Fact]
+    public async Task EnsureTheCorrectActionsAreGiven()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var streamWriter = new MockServerStreamWriter<FlightActionType>();
+
+        //When
+        await producer.ListActions(streamWriter, new 
MockServerCallContext()).ConfigureAwait(false);
+        var actions = streamWriter.Messages.ToArray();
+
+        Assert.Equal(FlightSqlUtils.FlightSqlActions, actions);
+    }
+
+    [Theory]
+    [InlineData(false,
+        new[] {"catalog_name", "db_schema_name", "table_name", "table_type"},
+        new[] {typeof(StringType), typeof(StringType), typeof(StringType), 
typeof(StringType)},
+        new[] {true, true, false, false})
+    ]
+    [InlineData(true,
+        new[] {"catalog_name", "db_schema_name", "table_name", "table_type", 
"table_schema"},
+        new[] {typeof(StringType), typeof(StringType), typeof(StringType), 
typeof(StringType), typeof(BinaryType)},
+        new[] {true, true, false, false, false})
+    ]
+    public void EnsureTableSchemaIsCorrectWithoutTableSchema(bool 
includeTableSchemaField, string[] expectedNames, Type[] expectedTypes, bool[] 
expectedIsNullable)
+    {
+        // Arrange
+
+        // Act
+        var schema = FlightSqlProducer.GetTableSchema(includeTableSchemaField);
+        var fields = schema.FieldsList;
+
+        //Assert
+        Assert.False(schema.HasMetadata);
+        Assert.Equal(expectedNames.Length, fields.Count);
+        for (int i = 0; i < fields.Count; i++)
+        {
+            Assert.Equal(expectedNames[i], fields[i].Name);
+            Assert.Equal(expectedTypes[i], fields[i].DataType.GetType());
+            Assert.Equal(expectedIsNullable[i], fields[i].IsNullable);
+        }
+    }
+
+    #region FlightInfoTests
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandStatementQuery()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandStatementQuery();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetStatementQueryFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandPreparedStatementQuery();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetPreparedStatementQueryFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCatalogs()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetCatalogs();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetCatalogFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetDbSchemas()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetDbSchemas();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetDbSchemaFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTables()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetTables();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetTablesFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTableTypes()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetTableTypes();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetTableTypesFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetSqlInfo()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetSqlInfo();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetSqlFlightInfo", flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetPrimaryKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetPrimaryKeys();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetPrimaryKeysFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetExportedKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetExportedKeys();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetExportedKeysFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetImportedKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetImportedKeys();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetImportedKeysFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCrossReference()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetCrossReference();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetCrossReferenceFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetXdbcTypeInfo()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetXdbcTypeInfo();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetXdbcTypeFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureAnInvalidOperationExceptionIsThrownWhenACommandIsNotSupportedAndHasNoDescriptor()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new BadCommand();
+
+        //When
+        var act = async () => await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+        var exception = await Record.ExceptionAsync(act);
+
+        //Then
+        Assert.Equal("command type  not supported", exception.Message);
+    }
+
+    [Fact]
+    public async void 
EnsureAnInvalidOperationExceptionIsThrownWhenACommandIsNotSupported()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandPreparedStatementUpdate();
+
+        //When
+        var act = async () => await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+        var exception = await Record.ExceptionAsync(act);
+
+        //Then
+        Assert.Equal("command type CommandPreparedStatementUpdate not 
supported", exception.Message);
+    }
+    #endregion
+
+    #region DoGetTests
+    [Fact]
+    public async void 
EnsureDoGetIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandPreparedStatementQuery();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetPreparedStatementQuery", 
schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetSqlInfo()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetSqlInfo();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetSqlInfo", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetCatalogs()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetCatalogs();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetCatalog", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetTableTypes()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetTableTypes();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetTableType", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetTables()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetTables();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetTables", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetDbSchemas()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetDbSchemas();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetDbSchema", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetPrimaryKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetPrimaryKeys();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetPrimaryKeys", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void 
EnsureDoGetIsCorrectlyRoutedForTheCommandGetExportedKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetExportedKeys();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetExportedKeys", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void 
EnsureDoGetIsCorrectlyRoutedForTheCommandGetImportedKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetImportedKeys();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetImportedKeys", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void 
EnsureDoGetIsCorrectlyRoutedForTheCommandGetCrossReference()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetCrossReference();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetCrossReference", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void 
EnsureDoGetIsCorrectlyRoutedForTheCommandGetXdbcTypeInfo()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetXdbcTypeInfo();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetXbdcTypeInfo", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void 
EnsureAnInvalidOperationExceptionIsThrownWhenADoGetCommandIsNotSupported()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var ticket = new FlightTicket("");
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        var act = async () => await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new 
MockServerCallContext());;
+        var exception = await Record.ExceptionAsync(act);
+
+        //Then
+        Assert.Equal("Status(StatusCode=\"InvalidArgument\", Detail=\"DoGet 
command  is not supported.\")", exception?.Message);
+    }
+    #endregion
+
+    #region DoActionTests
+    [Fact]
+    public async void 
EnsureDoActionIsCorrectlyRoutedForTheActionCreateRequest()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var action = new FlightAction(SqlAction.CreateRequest, new 
ActionCreatePreparedStatementRequest().PackAndSerialize());
+        var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+        //When
+        await producer.DoAction(action, mockStreamWriter, new 
MockServerCallContext());
+
+        //Then
+        Assert.Equal("CreatePreparedStatement", 
mockStreamWriter.Messages.First().Body.ToStringUtf8());
+    }
+
+    [Fact]
+    public async void EnsureDoActionIsCorrectlyRoutedForTheActionCloseRequest()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var action = new FlightAction(SqlAction.CloseRequest, new 
ActionClosePreparedStatementRequest().PackAndSerialize());
+        var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+        //When
+        await producer.DoAction(action, mockStreamWriter, new 
MockServerCallContext());
+
+        //Then
+        Assert.Equal("ClosePreparedStatement", 
mockStreamWriter.Messages.First().Body.ToStringUtf8());
+    }
+
+    [Fact]
+    public async void 
EnsureAnInvalidOperationExceptionIsThrownWhenADoActionCommandIsNotSupported()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var action = new FlightAction("BadCommand");
+        var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+        //When
+        var act = async () => await producer.DoAction(action, 
mockStreamWriter, new MockServerCallContext());
+        var exception = await Record.ExceptionAsync(act);
+
+        //Then
+        Assert.Equal("Action type BadCommand not supported", 
exception?.Message);
+    }
+    #endregion
+
+    #region DoPutTests
+    [Fact]
+    public async void 
EnsureDoPutIsCorrectlyRoutedForTheCommandStatementUpdate()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandStatementUpdate();
+        var reader = new 
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+        var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+        //When
+        await producer.DoPut(command, new 
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new 
MockServerCallContext()).ConfigureAwait(false);
+
+        //Then
+        Assert.Equal("PutStatementUpdate", 
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+    }
+
+    [Fact]
+    public async void 
EnsureDoPutIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandPreparedStatementQuery();
+        var reader = new 
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+        var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+        //When
+        await producer.DoPut(command, new 
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new 
MockServerCallContext()).ConfigureAwait(false);
+
+        //Then
+        Assert.Equal("PutPreparedStatementQuery", 
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+    }
+
+    [Fact]
+    public async void 
EnsureDoPutIsCorrectlyRoutedForTheCommandPreparedStatementUpdate()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandPreparedStatementUpdate();
+        var reader = new 
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+        var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+        //When
+        await producer.DoPut(command, new 
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new 
MockServerCallContext()).ConfigureAwait(false);
+
+        //Then
+        Assert.Equal("PutPreparedStatementUpdate", 
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+    }
+
+    [Fact]
+    public async void 
EnsureAnInvalidOperationExceptionIsThrownWhenADoPutCommandIsNotSupported()

Review Comment:
   Same question for the `EnsureAnInvalidOperation...`



##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlProducerTests.cs:
##########
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Server;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Google.Protobuf.Reflection;
+using Grpc.Core;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlProducerTests
+{
+    [Theory]
+    [InlineData(FlightDescriptorType.Path, null, null)]
+    [InlineData(FlightDescriptorType.Command, "", null)]
+    [InlineData(FlightDescriptorType.Command, 
"CkB0eXBlLmdvb2dsZWFwaXMuY29tL2Fycm93LmZsaWdodC5wcm90b2NvbC5zcWwuQ29tbWFuZEdldENhdGFsb2dz",
 typeof(CommandGetCatalogs))]
+    public void EnsureGetCommandReturnsTheCorrectResponse(FlightDescriptorType 
type, string? command, Type? expectedResult)
+    {
+        //Given
+        FlightDescriptor descriptor;
+        if (type == FlightDescriptorType.Command)
+        {
+            descriptor = command != null ? 
FlightDescriptor.CreateCommandDescriptor(ByteString.FromBase64(command).ToByteArray())
 : FlightDescriptor.CreateCommandDescriptor(ByteString.Empty.ToStringUtf8());
+        }
+        else
+        {
+            descriptor = 
FlightDescriptor.CreatePathDescriptor(System.Array.Empty<string>());
+        }
+
+        //When
+        var result = FlightSqlProducer.GetCommand(descriptor);
+
+        //Then
+        Assert.Equal(expectedResult, result?.GetType());
+    }
+
+    [Fact]
+    public async Task EnsureTheCorrectActionsAreGiven()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var streamWriter = new MockServerStreamWriter<FlightActionType>();
+
+        //When
+        await producer.ListActions(streamWriter, new 
MockServerCallContext()).ConfigureAwait(false);
+        var actions = streamWriter.Messages.ToArray();
+
+        Assert.Equal(FlightSqlUtils.FlightSqlActions, actions);
+    }
+
+    [Theory]
+    [InlineData(false,
+        new[] {"catalog_name", "db_schema_name", "table_name", "table_type"},
+        new[] {typeof(StringType), typeof(StringType), typeof(StringType), 
typeof(StringType)},
+        new[] {true, true, false, false})
+    ]
+    [InlineData(true,
+        new[] {"catalog_name", "db_schema_name", "table_name", "table_type", 
"table_schema"},
+        new[] {typeof(StringType), typeof(StringType), typeof(StringType), 
typeof(StringType), typeof(BinaryType)},
+        new[] {true, true, false, false, false})
+    ]
+    public void EnsureTableSchemaIsCorrectWithoutTableSchema(bool 
includeTableSchemaField, string[] expectedNames, Type[] expectedTypes, bool[] 
expectedIsNullable)
+    {
+        // Arrange
+
+        // Act
+        var schema = FlightSqlProducer.GetTableSchema(includeTableSchemaField);
+        var fields = schema.FieldsList;
+
+        //Assert
+        Assert.False(schema.HasMetadata);
+        Assert.Equal(expectedNames.Length, fields.Count);
+        for (int i = 0; i < fields.Count; i++)
+        {
+            Assert.Equal(expectedNames[i], fields[i].Name);
+            Assert.Equal(expectedTypes[i], fields[i].DataType.GetType());
+            Assert.Equal(expectedIsNullable[i], fields[i].IsNullable);
+        }
+    }
+
+    #region FlightInfoTests
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandStatementQuery()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandStatementQuery();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetStatementQueryFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandPreparedStatementQuery();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetPreparedStatementQueryFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCatalogs()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetCatalogs();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetCatalogFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetDbSchemas()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetDbSchemas();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetDbSchemaFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTables()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetTables();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetTablesFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTableTypes()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetTableTypes();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetTableTypesFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetSqlInfo()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetSqlInfo();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetSqlFlightInfo", flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetPrimaryKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetPrimaryKeys();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetPrimaryKeysFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetExportedKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetExportedKeys();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetExportedKeysFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetImportedKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetImportedKeys();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetImportedKeysFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCrossReference()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetCrossReference();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetCrossReferenceFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetXdbcTypeInfo()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetXdbcTypeInfo();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetXdbcTypeFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureAnInvalidOperationExceptionIsThrownWhenACommandIsNotSupportedAndHasNoDescriptor()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new BadCommand();
+
+        //When
+        var act = async () => await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+        var exception = await Record.ExceptionAsync(act);
+
+        //Then
+        Assert.Equal("command type  not supported", exception.Message);
+    }
+
+    [Fact]
+    public async void 
EnsureAnInvalidOperationExceptionIsThrownWhenACommandIsNotSupported()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandPreparedStatementUpdate();
+
+        //When
+        var act = async () => await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+        var exception = await Record.ExceptionAsync(act);
+
+        //Then
+        Assert.Equal("command type CommandPreparedStatementUpdate not 
supported", exception.Message);
+    }
+    #endregion
+
+    #region DoGetTests
+    [Fact]
+    public async void 
EnsureDoGetIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandPreparedStatementQuery();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetPreparedStatementQuery", 
schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetSqlInfo()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetSqlInfo();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetSqlInfo", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetCatalogs()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetCatalogs();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetCatalog", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetTableTypes()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetTableTypes();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetTableType", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetTables()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetTables();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetTables", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetDbSchemas()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetDbSchemas();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetDbSchema", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetPrimaryKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetPrimaryKeys();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetPrimaryKeys", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void 
EnsureDoGetIsCorrectlyRoutedForTheCommandGetExportedKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetExportedKeys();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetExportedKeys", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void 
EnsureDoGetIsCorrectlyRoutedForTheCommandGetImportedKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetImportedKeys();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetImportedKeys", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void 
EnsureDoGetIsCorrectlyRoutedForTheCommandGetCrossReference()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetCrossReference();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetCrossReference", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void 
EnsureDoGetIsCorrectlyRoutedForTheCommandGetXdbcTypeInfo()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetXdbcTypeInfo();
+        var ticket = new FlightTicket(command.PackAndSerialize());
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+        //Then
+        var schema = await streamWriter.Messages.GetSchema();
+        Assert.Equal("DoGetXbdcTypeInfo", schema.FieldsList.First().Name);
+    }
+
+    [Fact]
+    public async void 
EnsureAnInvalidOperationExceptionIsThrownWhenADoGetCommandIsNotSupported()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var ticket = new FlightTicket("");
+        var streamWriter = new MockServerStreamWriter<FlightData>();
+
+        //When
+        var act = async () => await producer.DoGet(ticket, new 
FlightServerRecordBatchStreamWriter(streamWriter), new 
MockServerCallContext());;
+        var exception = await Record.ExceptionAsync(act);
+
+        //Then
+        Assert.Equal("Status(StatusCode=\"InvalidArgument\", Detail=\"DoGet 
command  is not supported.\")", exception?.Message);
+    }
+    #endregion
+
+    #region DoActionTests
+    [Fact]
+    public async void 
EnsureDoActionIsCorrectlyRoutedForTheActionCreateRequest()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var action = new FlightAction(SqlAction.CreateRequest, new 
ActionCreatePreparedStatementRequest().PackAndSerialize());
+        var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+        //When
+        await producer.DoAction(action, mockStreamWriter, new 
MockServerCallContext());
+
+        //Then
+        Assert.Equal("CreatePreparedStatement", 
mockStreamWriter.Messages.First().Body.ToStringUtf8());
+    }
+
+    [Fact]
+    public async void EnsureDoActionIsCorrectlyRoutedForTheActionCloseRequest()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var action = new FlightAction(SqlAction.CloseRequest, new 
ActionClosePreparedStatementRequest().PackAndSerialize());
+        var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+        //When
+        await producer.DoAction(action, mockStreamWriter, new 
MockServerCallContext());
+
+        //Then
+        Assert.Equal("ClosePreparedStatement", 
mockStreamWriter.Messages.First().Body.ToStringUtf8());
+    }
+
+    [Fact]
+    public async void 
EnsureAnInvalidOperationExceptionIsThrownWhenADoActionCommandIsNotSupported()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var action = new FlightAction("BadCommand");
+        var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+        //When
+        var act = async () => await producer.DoAction(action, 
mockStreamWriter, new MockServerCallContext());
+        var exception = await Record.ExceptionAsync(act);
+
+        //Then
+        Assert.Equal("Action type BadCommand not supported", 
exception?.Message);
+    }
+    #endregion
+
+    #region DoPutTests
+    [Fact]
+    public async void 
EnsureDoPutIsCorrectlyRoutedForTheCommandStatementUpdate()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandStatementUpdate();
+        var reader = new 
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+        var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+        //When
+        await producer.DoPut(command, new 
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new 
MockServerCallContext()).ConfigureAwait(false);
+
+        //Then
+        Assert.Equal("PutStatementUpdate", 
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+    }
+
+    [Fact]
+    public async void 
EnsureDoPutIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandPreparedStatementQuery();
+        var reader = new 
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+        var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+        //When
+        await producer.DoPut(command, new 
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new 
MockServerCallContext()).ConfigureAwait(false);
+
+        //Then
+        Assert.Equal("PutPreparedStatementQuery", 
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+    }
+
+    [Fact]
+    public async void 
EnsureDoPutIsCorrectlyRoutedForTheCommandPreparedStatementUpdate()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandPreparedStatementUpdate();
+        var reader = new 
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+        var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+        //When
+        await producer.DoPut(command, new 
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new 
MockServerCallContext()).ConfigureAwait(false);
+
+        //Then
+        Assert.Equal("PutPreparedStatementUpdate", 
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+    }
+
+    [Fact]
+    public async void 
EnsureAnInvalidOperationExceptionIsThrownWhenADoPutCommandIsNotSupported()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetXdbcTypeInfo();
+        var reader = new 
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+        var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+        //When
+        var act = async () => await producer.DoPut(command, new 
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new 
MockServerCallContext()).ConfigureAwait(false);
+        var exception = await Record.ExceptionAsync(act);
+
+        //Then
+        Assert.Equal("Command CommandGetXdbcTypeInfo not supported", 
exception?.Message);
+    }
+    #endregion
+
+    private class MockServerCallContext : ServerCallContext
+    {
+        protected override Task WriteResponseHeadersAsyncCore(Metadata 
responseHeaders) => throw new NotImplementedException();
+
+        protected override ContextPropagationToken 
CreatePropagationTokenCore(ContextPropagationOptions? options) => throw new 
NotImplementedException();
+
+        protected override string MethodCore { get; }

Review Comment:
   Can you fix these lint checks?



##########
csharp/src/Apache.Arrow.Flight/Sql/FlightSqlUtils.cs:
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#nullable enable
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Diagnostics;
+using Apache.Arrow.Flight.Internal;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Apache.Arrow.Flight.Sql
+{
+    /// <summary>
+    /// Helper methods for doing common Flight Sql tasks and conversions
+    /// </summary>
+    public class FlightSqlUtils
+    {
+        public static readonly FlightActionType 
FlightSqlCreatePreparedStatement = new("CreatePreparedStatement",
+            "Creates a reusable prepared statement resource on the server. \n" 
+
+            "Request Message: ActionCreatePreparedStatementRequest\n" +
+            "Response Message: ActionCreatePreparedStatementResult");
+
+        public static readonly FlightActionType 
FlightSqlClosePreparedStatement = new("ClosePreparedStatement",
+            "Closes a reusable prepared statement resource on the server. \n" +
+            "Request Message: ActionClosePreparedStatementRequest\n" +
+            "Response Message: N/A");

Review Comment:
   Why are only these two actions defined?  Is the idea that the rest will be 
added in future PRs?  Or are these the only two that are handled by the base 
implementation and the assumption is that the others will be provided by the 
derived implementation itself?



##########
csharp/src/Apache.Arrow.Flight/FlightHandshakeRequest.cs:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Google.Protobuf;
+
+namespace Apache.Arrow.Flight;
+
+public class FlightHandshakeRequest

Review Comment:
   What is the purpose of `FlightData`, `FlightHandshakeRequest`, and 
`FlightHandshakeResponse`?  They seem to all be rather direct clones of 
`Protocol.FlightData`, `Protocol.FlightHandshakeRequest`, and 
`Protocol.FlightHandshakeResponse`?



##########
csharp/src/Apache.Arrow.Flight/Server/FlightServer.cs:
##########
@@ -14,15 +14,25 @@
 // limitations under the License.
 
 using System;
-using System.Collections.Generic;
-using System.Text;
 using System.Threading.Tasks;
+using Apache.Arrow.Flight.Sql;
 using Grpc.Core;
 
 namespace Apache.Arrow.Flight.Server
 {
     public abstract class FlightServer
     {
+#if NET6_0_OR_GREATER
+        public FlightSqlProducer SqlProducer { init; protected internal get; }
+#else
+        protected internal FlightSqlProducer SqlProducer { get; }
+#endif

Review Comment:
   If the constructor were made public then couldn't you leave this property as 
`protected internal` and get the same semantics without the `#if` macros?
   
   Also, I'm not sure I understand using `#if NET6_0_OR_GREATER` to gate 
something that is controlled by the C# language version (e.g. C# >= 9.0) and 
not the framework version.  If we want to use C# 9.0 I think the maintainers 
should just all agree to use C# 9.0.  We don't need to multi-target language 
versions.



##########
csharp/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs:
##########
@@ -35,16 +33,33 @@ public FlightServerImplementation(FlightServer flightServer)
             _flightServer = flightServer;
         }
 
-        public override async Task DoPut(IAsyncStreamReader<FlightData> 
requestStream, IServerStreamWriter<Protocol.PutResult> responseStream, 
ServerCallContext context)
+        public override async Task 
DoPut(IAsyncStreamReader<Protocol.FlightData> requestStream, 
IServerStreamWriter<Protocol.PutResult> responseStream, ServerCallContext 
context)
         {
             var readStream = new 
FlightServerRecordBatchStreamReader(requestStream);
             var writeStream = new StreamWriter<FlightPutResult, 
Protocol.PutResult>(responseStream, putResult => putResult.ToProtocol());
-            await _flightServer.DoPut(readStream, writeStream, 
context).ConfigureAwait(false);
+
+            if (_flightServer.SqlProducer != null && await 
FlightSqlProducer.GetCommand(readStream).ConfigureAwait(false) is { } command)

Review Comment:
   I'm a little confused by this approach.  Wouldn't you want to have both a 
`FlightServerImplementation` and a `FlightSqlServerImplementation`?  Maybe the 
second could extend the first?
   
   The `FlightSqlServerImplementation` would then always have 
`_flightServer.SqlProducer != null` and you wouldn't need an if check in each 
call?



##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlProducerTests.cs:
##########
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Server;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Google.Protobuf.Reflection;
+using Grpc.Core;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlProducerTests
+{
+    [Theory]
+    [InlineData(FlightDescriptorType.Path, null, null)]
+    [InlineData(FlightDescriptorType.Command, "", null)]
+    [InlineData(FlightDescriptorType.Command, 
"CkB0eXBlLmdvb2dsZWFwaXMuY29tL2Fycm93LmZsaWdodC5wcm90b2NvbC5zcWwuQ29tbWFuZEdldENhdGFsb2dz",
 typeof(CommandGetCatalogs))]
+    public void EnsureGetCommandReturnsTheCorrectResponse(FlightDescriptorType 
type, string? command, Type? expectedResult)
+    {
+        //Given
+        FlightDescriptor descriptor;
+        if (type == FlightDescriptorType.Command)
+        {
+            descriptor = command != null ? 
FlightDescriptor.CreateCommandDescriptor(ByteString.FromBase64(command).ToByteArray())
 : FlightDescriptor.CreateCommandDescriptor(ByteString.Empty.ToStringUtf8());
+        }
+        else
+        {
+            descriptor = 
FlightDescriptor.CreatePathDescriptor(System.Array.Empty<string>());
+        }
+
+        //When
+        var result = FlightSqlProducer.GetCommand(descriptor);
+
+        //Then
+        Assert.Equal(expectedResult, result?.GetType());
+    }
+
+    [Fact]
+    public async Task EnsureTheCorrectActionsAreGiven()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var streamWriter = new MockServerStreamWriter<FlightActionType>();
+
+        //When
+        await producer.ListActions(streamWriter, new 
MockServerCallContext()).ConfigureAwait(false);
+        var actions = streamWriter.Messages.ToArray();
+
+        Assert.Equal(FlightSqlUtils.FlightSqlActions, actions);
+    }
+
+    [Theory]
+    [InlineData(false,
+        new[] {"catalog_name", "db_schema_name", "table_name", "table_type"},
+        new[] {typeof(StringType), typeof(StringType), typeof(StringType), 
typeof(StringType)},
+        new[] {true, true, false, false})
+    ]
+    [InlineData(true,
+        new[] {"catalog_name", "db_schema_name", "table_name", "table_type", 
"table_schema"},
+        new[] {typeof(StringType), typeof(StringType), typeof(StringType), 
typeof(StringType), typeof(BinaryType)},
+        new[] {true, true, false, false, false})
+    ]
+    public void EnsureTableSchemaIsCorrectWithoutTableSchema(bool 
includeTableSchemaField, string[] expectedNames, Type[] expectedTypes, bool[] 
expectedIsNullable)
+    {
+        // Arrange
+
+        // Act
+        var schema = FlightSqlProducer.GetTableSchema(includeTableSchemaField);
+        var fields = schema.FieldsList;
+
+        //Assert
+        Assert.False(schema.HasMetadata);
+        Assert.Equal(expectedNames.Length, fields.Count);
+        for (int i = 0; i < fields.Count; i++)
+        {
+            Assert.Equal(expectedNames[i], fields[i].Name);
+            Assert.Equal(expectedTypes[i], fields[i].DataType.GetType());
+            Assert.Equal(expectedIsNullable[i], fields[i].IsNullable);
+        }
+    }
+
+    #region FlightInfoTests
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandStatementQuery()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandStatementQuery();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetStatementQueryFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandPreparedStatementQuery();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetPreparedStatementQueryFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCatalogs()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetCatalogs();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetCatalogFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetDbSchemas()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetDbSchemas();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetDbSchemaFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTables()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetTables();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetTablesFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTableTypes()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetTableTypes();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetTableTypesFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetSqlInfo()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetSqlInfo();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetSqlFlightInfo", flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetPrimaryKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetPrimaryKeys();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetPrimaryKeysFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetExportedKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetExportedKeys();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetExportedKeysFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetImportedKeys()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetImportedKeys();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetImportedKeysFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCrossReference()
+    {
+        //Given
+        var producer = new TestFlightSqlProducer();
+        var command = new CommandGetCrossReference();
+
+        //When
+        var flightInfo = await producer.GetFlightInfo(command, 
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+        //Then
+        Assert.Equal("GetCrossReferenceFlightInfo", 
flightInfo.Descriptor.Paths.First());
+    }
+
+    [Fact]
+    public async void 
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetXdbcTypeInfo()

Review Comment:
   I wonder if all of these `EnsureGetFlightInfoIsCorrectlyRouted` tests could 
be collapsed into a single test parameterized with `InlineData`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to