westonpace commented on code in PR #36079:
URL: https://github.com/apache/arrow/pull/36079#discussion_r1265474505
##########
csharp/src/Apache.Arrow.Flight/Apache.Arrow.Flight.csproj:
##########
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
- <TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
+ <TargetFrameworks>netstandard2.0;netstandard2.1;net6.0</TargetFrameworks>
Review Comment:
Is there a particular need to add `net6.0` here? Wouldn't `netstandard2.0`
and `netstandard2.1` be compatible with `net6.0`?
##########
csharp/src/Apache.Arrow.Flight/Client/FlightClient.cs:
##########
@@ -93,6 +93,22 @@ public FlightRecordBatchDuplexStreamingCall
StartPut(FlightDescriptor flightDesc
channels.Dispose);
}
+ public AsyncDuplexStreamingCall<FlightHandshakeRequest,
FlightHandshakeResponse> Handshake(Metadata headers = null)
Review Comment:
I don't know much about flight but why does this method return a async call
when the underlying client method (`Handshake`) is synchronous?
##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlProducerTests.cs:
##########
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Server;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Google.Protobuf.Reflection;
+using Grpc.Core;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlProducerTests
+{
+ [Theory]
+ [InlineData(FlightDescriptorType.Path, null, null)]
+ [InlineData(FlightDescriptorType.Command, "", null)]
+ [InlineData(FlightDescriptorType.Command,
"CkB0eXBlLmdvb2dsZWFwaXMuY29tL2Fycm93LmZsaWdodC5wcm90b2NvbC5zcWwuQ29tbWFuZEdldENhdGFsb2dz",
typeof(CommandGetCatalogs))]
Review Comment:
Is this a binary protobuf message? it seems like this might be something
difficult to maintain.
##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlProducerTests.cs:
##########
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Server;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Google.Protobuf.Reflection;
+using Grpc.Core;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlProducerTests
+{
+ [Theory]
+ [InlineData(FlightDescriptorType.Path, null, null)]
+ [InlineData(FlightDescriptorType.Command, "", null)]
+ [InlineData(FlightDescriptorType.Command,
"CkB0eXBlLmdvb2dsZWFwaXMuY29tL2Fycm93LmZsaWdodC5wcm90b2NvbC5zcWwuQ29tbWFuZEdldENhdGFsb2dz",
typeof(CommandGetCatalogs))]
+ public void EnsureGetCommandReturnsTheCorrectResponse(FlightDescriptorType
type, string? command, Type? expectedResult)
+ {
+ //Given
+ FlightDescriptor descriptor;
+ if (type == FlightDescriptorType.Command)
+ {
+ descriptor = command != null ?
FlightDescriptor.CreateCommandDescriptor(ByteString.FromBase64(command).ToByteArray())
: FlightDescriptor.CreateCommandDescriptor(ByteString.Empty.ToStringUtf8());
+ }
+ else
+ {
+ descriptor =
FlightDescriptor.CreatePathDescriptor(System.Array.Empty<string>());
+ }
+
+ //When
+ var result = FlightSqlProducer.GetCommand(descriptor);
+
+ //Then
+ Assert.Equal(expectedResult, result?.GetType());
+ }
+
+ [Fact]
+ public async Task EnsureTheCorrectActionsAreGiven()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var streamWriter = new MockServerStreamWriter<FlightActionType>();
+
+ //When
+ await producer.ListActions(streamWriter, new
MockServerCallContext()).ConfigureAwait(false);
+ var actions = streamWriter.Messages.ToArray();
+
+ Assert.Equal(FlightSqlUtils.FlightSqlActions, actions);
+ }
+
+ [Theory]
+ [InlineData(false,
+ new[] {"catalog_name", "db_schema_name", "table_name", "table_type"},
+ new[] {typeof(StringType), typeof(StringType), typeof(StringType),
typeof(StringType)},
+ new[] {true, true, false, false})
+ ]
+ [InlineData(true,
+ new[] {"catalog_name", "db_schema_name", "table_name", "table_type",
"table_schema"},
+ new[] {typeof(StringType), typeof(StringType), typeof(StringType),
typeof(StringType), typeof(BinaryType)},
+ new[] {true, true, false, false, false})
+ ]
+ public void EnsureTableSchemaIsCorrectWithoutTableSchema(bool
includeTableSchemaField, string[] expectedNames, Type[] expectedTypes, bool[]
expectedIsNullable)
+ {
+ // Arrange
+
+ // Act
+ var schema = FlightSqlProducer.GetTableSchema(includeTableSchemaField);
+ var fields = schema.FieldsList;
+
+ //Assert
+ Assert.False(schema.HasMetadata);
+ Assert.Equal(expectedNames.Length, fields.Count);
+ for (int i = 0; i < fields.Count; i++)
+ {
+ Assert.Equal(expectedNames[i], fields[i].Name);
+ Assert.Equal(expectedTypes[i], fields[i].DataType.GetType());
+ Assert.Equal(expectedIsNullable[i], fields[i].IsNullable);
+ }
+ }
+
+ #region FlightInfoTests
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandStatementQuery()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandStatementQuery();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetStatementQueryFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandPreparedStatementQuery();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetPreparedStatementQueryFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCatalogs()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetCatalogs();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetCatalogFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetDbSchemas()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetDbSchemas();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetDbSchemaFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTables()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetTables();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetTablesFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTableTypes()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetTableTypes();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetTableTypesFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetSqlInfo()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetSqlInfo();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetSqlFlightInfo", flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetPrimaryKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetPrimaryKeys();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetPrimaryKeysFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetExportedKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetExportedKeys();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetExportedKeysFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetImportedKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetImportedKeys();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetImportedKeysFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCrossReference()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetCrossReference();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetCrossReferenceFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetXdbcTypeInfo()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetXdbcTypeInfo();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetXdbcTypeFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureAnInvalidOperationExceptionIsThrownWhenACommandIsNotSupportedAndHasNoDescriptor()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new BadCommand();
+
+ //When
+ var act = async () => await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+ var exception = await Record.ExceptionAsync(act);
+
+ //Then
+ Assert.Equal("command type not supported", exception.Message);
+ }
+
+ [Fact]
+ public async void
EnsureAnInvalidOperationExceptionIsThrownWhenACommandIsNotSupported()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandPreparedStatementUpdate();
+
+ //When
+ var act = async () => await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+ var exception = await Record.ExceptionAsync(act);
+
+ //Then
+ Assert.Equal("command type CommandPreparedStatementUpdate not
supported", exception.Message);
+ }
+ #endregion
+
+ #region DoGetTests
+ [Fact]
+ public async void
EnsureDoGetIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandPreparedStatementQuery();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetPreparedStatementQuery",
schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetSqlInfo()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetSqlInfo();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetSqlInfo", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetCatalogs()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetCatalogs();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetCatalog", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetTableTypes()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetTableTypes();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetTableType", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetTables()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetTables();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetTables", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetDbSchemas()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetDbSchemas();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetDbSchema", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetPrimaryKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetPrimaryKeys();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetPrimaryKeys", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void
EnsureDoGetIsCorrectlyRoutedForTheCommandGetExportedKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetExportedKeys();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetExportedKeys", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void
EnsureDoGetIsCorrectlyRoutedForTheCommandGetImportedKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetImportedKeys();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetImportedKeys", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void
EnsureDoGetIsCorrectlyRoutedForTheCommandGetCrossReference()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetCrossReference();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetCrossReference", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void
EnsureDoGetIsCorrectlyRoutedForTheCommandGetXdbcTypeInfo()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetXdbcTypeInfo();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetXbdcTypeInfo", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void
EnsureAnInvalidOperationExceptionIsThrownWhenADoGetCommandIsNotSupported()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var ticket = new FlightTicket("");
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ var act = async () => await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new
MockServerCallContext());;
+ var exception = await Record.ExceptionAsync(act);
+
+ //Then
+ Assert.Equal("Status(StatusCode=\"InvalidArgument\", Detail=\"DoGet
command is not supported.\")", exception?.Message);
+ }
+ #endregion
+
+ #region DoActionTests
+ [Fact]
+ public async void
EnsureDoActionIsCorrectlyRoutedForTheActionCreateRequest()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var action = new FlightAction(SqlAction.CreateRequest, new
ActionCreatePreparedStatementRequest().PackAndSerialize());
+ var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+ //When
+ await producer.DoAction(action, mockStreamWriter, new
MockServerCallContext());
+
+ //Then
+ Assert.Equal("CreatePreparedStatement",
mockStreamWriter.Messages.First().Body.ToStringUtf8());
+ }
+
+ [Fact]
+ public async void EnsureDoActionIsCorrectlyRoutedForTheActionCloseRequest()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var action = new FlightAction(SqlAction.CloseRequest, new
ActionClosePreparedStatementRequest().PackAndSerialize());
+ var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+ //When
+ await producer.DoAction(action, mockStreamWriter, new
MockServerCallContext());
+
+ //Then
+ Assert.Equal("ClosePreparedStatement",
mockStreamWriter.Messages.First().Body.ToStringUtf8());
+ }
+
+ [Fact]
+ public async void
EnsureAnInvalidOperationExceptionIsThrownWhenADoActionCommandIsNotSupported()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var action = new FlightAction("BadCommand");
+ var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+ //When
+ var act = async () => await producer.DoAction(action,
mockStreamWriter, new MockServerCallContext());
+ var exception = await Record.ExceptionAsync(act);
+
+ //Then
+ Assert.Equal("Action type BadCommand not supported",
exception?.Message);
+ }
+ #endregion
+
+ #region DoPutTests
+ [Fact]
+ public async void
EnsureDoPutIsCorrectlyRoutedForTheCommandStatementUpdate()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandStatementUpdate();
+ var reader = new
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+ var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+ //When
+ await producer.DoPut(command, new
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new
MockServerCallContext()).ConfigureAwait(false);
+
+ //Then
+ Assert.Equal("PutStatementUpdate",
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+ }
+
+ [Fact]
+ public async void
EnsureDoPutIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandPreparedStatementQuery();
+ var reader = new
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+ var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+ //When
+ await producer.DoPut(command, new
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new
MockServerCallContext()).ConfigureAwait(false);
+
+ //Then
+ Assert.Equal("PutPreparedStatementQuery",
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+ }
+
+ [Fact]
+ public async void
EnsureDoPutIsCorrectlyRoutedForTheCommandPreparedStatementUpdate()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandPreparedStatementUpdate();
+ var reader = new
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+ var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+ //When
+ await producer.DoPut(command, new
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new
MockServerCallContext()).ConfigureAwait(false);
+
+ //Then
+ Assert.Equal("PutPreparedStatementUpdate",
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+ }
+
+ [Fact]
+ public async void
EnsureAnInvalidOperationExceptionIsThrownWhenADoPutCommandIsNotSupported()
Review Comment:
Same question for the `EnsureAnInvalidOperation...`
##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlProducerTests.cs:
##########
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Server;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Google.Protobuf.Reflection;
+using Grpc.Core;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlProducerTests
+{
+ [Theory]
+ [InlineData(FlightDescriptorType.Path, null, null)]
+ [InlineData(FlightDescriptorType.Command, "", null)]
+ [InlineData(FlightDescriptorType.Command,
"CkB0eXBlLmdvb2dsZWFwaXMuY29tL2Fycm93LmZsaWdodC5wcm90b2NvbC5zcWwuQ29tbWFuZEdldENhdGFsb2dz",
typeof(CommandGetCatalogs))]
+ public void EnsureGetCommandReturnsTheCorrectResponse(FlightDescriptorType
type, string? command, Type? expectedResult)
+ {
+ //Given
+ FlightDescriptor descriptor;
+ if (type == FlightDescriptorType.Command)
+ {
+ descriptor = command != null ?
FlightDescriptor.CreateCommandDescriptor(ByteString.FromBase64(command).ToByteArray())
: FlightDescriptor.CreateCommandDescriptor(ByteString.Empty.ToStringUtf8());
+ }
+ else
+ {
+ descriptor =
FlightDescriptor.CreatePathDescriptor(System.Array.Empty<string>());
+ }
+
+ //When
+ var result = FlightSqlProducer.GetCommand(descriptor);
+
+ //Then
+ Assert.Equal(expectedResult, result?.GetType());
+ }
+
+ [Fact]
+ public async Task EnsureTheCorrectActionsAreGiven()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var streamWriter = new MockServerStreamWriter<FlightActionType>();
+
+ //When
+ await producer.ListActions(streamWriter, new
MockServerCallContext()).ConfigureAwait(false);
+ var actions = streamWriter.Messages.ToArray();
+
+ Assert.Equal(FlightSqlUtils.FlightSqlActions, actions);
+ }
+
+ [Theory]
+ [InlineData(false,
+ new[] {"catalog_name", "db_schema_name", "table_name", "table_type"},
+ new[] {typeof(StringType), typeof(StringType), typeof(StringType),
typeof(StringType)},
+ new[] {true, true, false, false})
+ ]
+ [InlineData(true,
+ new[] {"catalog_name", "db_schema_name", "table_name", "table_type",
"table_schema"},
+ new[] {typeof(StringType), typeof(StringType), typeof(StringType),
typeof(StringType), typeof(BinaryType)},
+ new[] {true, true, false, false, false})
+ ]
+ public void EnsureTableSchemaIsCorrectWithoutTableSchema(bool
includeTableSchemaField, string[] expectedNames, Type[] expectedTypes, bool[]
expectedIsNullable)
+ {
+ // Arrange
+
+ // Act
+ var schema = FlightSqlProducer.GetTableSchema(includeTableSchemaField);
+ var fields = schema.FieldsList;
+
+ //Assert
+ Assert.False(schema.HasMetadata);
+ Assert.Equal(expectedNames.Length, fields.Count);
+ for (int i = 0; i < fields.Count; i++)
+ {
+ Assert.Equal(expectedNames[i], fields[i].Name);
+ Assert.Equal(expectedTypes[i], fields[i].DataType.GetType());
+ Assert.Equal(expectedIsNullable[i], fields[i].IsNullable);
+ }
+ }
+
+ #region FlightInfoTests
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandStatementQuery()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandStatementQuery();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetStatementQueryFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandPreparedStatementQuery();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetPreparedStatementQueryFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCatalogs()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetCatalogs();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetCatalogFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetDbSchemas()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetDbSchemas();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetDbSchemaFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTables()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetTables();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetTablesFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTableTypes()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetTableTypes();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetTableTypesFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetSqlInfo()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetSqlInfo();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetSqlFlightInfo", flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetPrimaryKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetPrimaryKeys();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetPrimaryKeysFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetExportedKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetExportedKeys();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetExportedKeysFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetImportedKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetImportedKeys();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetImportedKeysFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCrossReference()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetCrossReference();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetCrossReferenceFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetXdbcTypeInfo()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetXdbcTypeInfo();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetXdbcTypeFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureAnInvalidOperationExceptionIsThrownWhenACommandIsNotSupportedAndHasNoDescriptor()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new BadCommand();
+
+ //When
+ var act = async () => await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+ var exception = await Record.ExceptionAsync(act);
+
+ //Then
+ Assert.Equal("command type not supported", exception.Message);
+ }
+
+ [Fact]
+ public async void
EnsureAnInvalidOperationExceptionIsThrownWhenACommandIsNotSupported()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandPreparedStatementUpdate();
+
+ //When
+ var act = async () => await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+ var exception = await Record.ExceptionAsync(act);
+
+ //Then
+ Assert.Equal("command type CommandPreparedStatementUpdate not
supported", exception.Message);
+ }
+ #endregion
+
+ #region DoGetTests
+ [Fact]
+ public async void
EnsureDoGetIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandPreparedStatementQuery();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetPreparedStatementQuery",
schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetSqlInfo()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetSqlInfo();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetSqlInfo", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetCatalogs()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetCatalogs();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetCatalog", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetTableTypes()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetTableTypes();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetTableType", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetTables()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetTables();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetTables", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetDbSchemas()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetDbSchemas();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetDbSchema", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void EnsureDoGetIsCorrectlyRoutedForTheCommandGetPrimaryKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetPrimaryKeys();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetPrimaryKeys", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void
EnsureDoGetIsCorrectlyRoutedForTheCommandGetExportedKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetExportedKeys();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetExportedKeys", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void
EnsureDoGetIsCorrectlyRoutedForTheCommandGetImportedKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetImportedKeys();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetImportedKeys", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void
EnsureDoGetIsCorrectlyRoutedForTheCommandGetCrossReference()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetCrossReference();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetCrossReference", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void
EnsureDoGetIsCorrectlyRoutedForTheCommandGetXdbcTypeInfo()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetXdbcTypeInfo();
+ var ticket = new FlightTicket(command.PackAndSerialize());
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new MockServerCallContext());
+
+ //Then
+ var schema = await streamWriter.Messages.GetSchema();
+ Assert.Equal("DoGetXbdcTypeInfo", schema.FieldsList.First().Name);
+ }
+
+ [Fact]
+ public async void
EnsureAnInvalidOperationExceptionIsThrownWhenADoGetCommandIsNotSupported()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var ticket = new FlightTicket("");
+ var streamWriter = new MockServerStreamWriter<FlightData>();
+
+ //When
+ var act = async () => await producer.DoGet(ticket, new
FlightServerRecordBatchStreamWriter(streamWriter), new
MockServerCallContext());;
+ var exception = await Record.ExceptionAsync(act);
+
+ //Then
+ Assert.Equal("Status(StatusCode=\"InvalidArgument\", Detail=\"DoGet
command is not supported.\")", exception?.Message);
+ }
+ #endregion
+
+ #region DoActionTests
+ [Fact]
+ public async void
EnsureDoActionIsCorrectlyRoutedForTheActionCreateRequest()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var action = new FlightAction(SqlAction.CreateRequest, new
ActionCreatePreparedStatementRequest().PackAndSerialize());
+ var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+ //When
+ await producer.DoAction(action, mockStreamWriter, new
MockServerCallContext());
+
+ //Then
+ Assert.Equal("CreatePreparedStatement",
mockStreamWriter.Messages.First().Body.ToStringUtf8());
+ }
+
+ [Fact]
+ public async void EnsureDoActionIsCorrectlyRoutedForTheActionCloseRequest()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var action = new FlightAction(SqlAction.CloseRequest, new
ActionClosePreparedStatementRequest().PackAndSerialize());
+ var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+ //When
+ await producer.DoAction(action, mockStreamWriter, new
MockServerCallContext());
+
+ //Then
+ Assert.Equal("ClosePreparedStatement",
mockStreamWriter.Messages.First().Body.ToStringUtf8());
+ }
+
+ [Fact]
+ public async void
EnsureAnInvalidOperationExceptionIsThrownWhenADoActionCommandIsNotSupported()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var action = new FlightAction("BadCommand");
+ var mockStreamWriter = new MockStreamWriter<FlightResult>();
+
+ //When
+ var act = async () => await producer.DoAction(action,
mockStreamWriter, new MockServerCallContext());
+ var exception = await Record.ExceptionAsync(act);
+
+ //Then
+ Assert.Equal("Action type BadCommand not supported",
exception?.Message);
+ }
+ #endregion
+
+ #region DoPutTests
+ [Fact]
+ public async void
EnsureDoPutIsCorrectlyRoutedForTheCommandStatementUpdate()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandStatementUpdate();
+ var reader = new
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+ var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+ //When
+ await producer.DoPut(command, new
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new
MockServerCallContext()).ConfigureAwait(false);
+
+ //Then
+ Assert.Equal("PutStatementUpdate",
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+ }
+
+ [Fact]
+ public async void
EnsureDoPutIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandPreparedStatementQuery();
+ var reader = new
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+ var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+ //When
+ await producer.DoPut(command, new
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new
MockServerCallContext()).ConfigureAwait(false);
+
+ //Then
+ Assert.Equal("PutPreparedStatementQuery",
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+ }
+
+ [Fact]
+ public async void
EnsureDoPutIsCorrectlyRoutedForTheCommandPreparedStatementUpdate()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandPreparedStatementUpdate();
+ var reader = new
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+ var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+ //When
+ await producer.DoPut(command, new
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new
MockServerCallContext()).ConfigureAwait(false);
+
+ //Then
+ Assert.Equal("PutPreparedStatementUpdate",
mockStreamWriter.Messages[0].ApplicationMetadata.ToStringUtf8());
+ }
+
+ [Fact]
+ public async void
EnsureAnInvalidOperationExceptionIsThrownWhenADoPutCommandIsNotSupported()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetXdbcTypeInfo();
+ var reader = new
MockStreamReader<FlightData>(System.Array.Empty<FlightData>());
+ var mockStreamWriter = new MockServerStreamWriter<FlightPutResult>();
+
+ //When
+ var act = async () => await producer.DoPut(command, new
FlightServerRecordBatchStreamReader(reader), mockStreamWriter, new
MockServerCallContext()).ConfigureAwait(false);
+ var exception = await Record.ExceptionAsync(act);
+
+ //Then
+ Assert.Equal("Command CommandGetXdbcTypeInfo not supported",
exception?.Message);
+ }
+ #endregion
+
+ private class MockServerCallContext : ServerCallContext
+ {
+ protected override Task WriteResponseHeadersAsyncCore(Metadata
responseHeaders) => throw new NotImplementedException();
+
+ protected override ContextPropagationToken
CreatePropagationTokenCore(ContextPropagationOptions? options) => throw new
NotImplementedException();
+
+ protected override string MethodCore { get; }
Review Comment:
Can you fix these lint checks?
##########
csharp/src/Apache.Arrow.Flight/Sql/FlightSqlUtils.cs:
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#nullable enable
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.Diagnostics;
+using Apache.Arrow.Flight.Internal;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Google.Protobuf.WellKnownTypes;
+
+namespace Apache.Arrow.Flight.Sql
+{
+ /// <summary>
+ /// Helper methods for doing common Flight Sql tasks and conversions
+ /// </summary>
+ public class FlightSqlUtils
+ {
+ public static readonly FlightActionType
FlightSqlCreatePreparedStatement = new("CreatePreparedStatement",
+ "Creates a reusable prepared statement resource on the server. \n"
+
+ "Request Message: ActionCreatePreparedStatementRequest\n" +
+ "Response Message: ActionCreatePreparedStatementResult");
+
+ public static readonly FlightActionType
FlightSqlClosePreparedStatement = new("ClosePreparedStatement",
+ "Closes a reusable prepared statement resource on the server. \n" +
+ "Request Message: ActionClosePreparedStatementRequest\n" +
+ "Response Message: N/A");
Review Comment:
Why are only these two actions defined? Is the idea that the rest will be
added in future PRs? Or are these the only two that are handled by the base
implementation and the assumption is that the others will be provided by the
derived implementation itself?
##########
csharp/src/Apache.Arrow.Flight/FlightHandshakeRequest.cs:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Google.Protobuf;
+
+namespace Apache.Arrow.Flight;
+
+public class FlightHandshakeRequest
Review Comment:
What is the purpose of `FlightData`, `FlightHandshakeRequest`, and
`FlightHandshakeResponse`? They seem to all be rather direct clones of
`Protocol.FlightData`, `Protocol.FlightHandshakeRequest`, and
`Protocol.FlightHandshakeResponse`?
##########
csharp/src/Apache.Arrow.Flight/Server/FlightServer.cs:
##########
@@ -14,15 +14,25 @@
// limitations under the License.
using System;
-using System.Collections.Generic;
-using System.Text;
using System.Threading.Tasks;
+using Apache.Arrow.Flight.Sql;
using Grpc.Core;
namespace Apache.Arrow.Flight.Server
{
public abstract class FlightServer
{
+#if NET6_0_OR_GREATER
+ public FlightSqlProducer SqlProducer { init; protected internal get; }
+#else
+ protected internal FlightSqlProducer SqlProducer { get; }
+#endif
Review Comment:
If the constructor were made public then couldn't you leave this property as
`protected internal` and get the same semantics without the `#if` macros?
Also, I'm not sure I understand using `#if NET6_0_OR_GREATER` to gate
something that is controlled by the C# language version (e.g. C# >= 9.0) and
not the framework version. If we want to use C# 9.0 I think the maintainers
should just all agree to use C# 9.0. We don't need to multi-target language
versions.
##########
csharp/src/Apache.Arrow.Flight/Server/Internal/FlightServerImplementation.cs:
##########
@@ -35,16 +33,33 @@ public FlightServerImplementation(FlightServer flightServer)
_flightServer = flightServer;
}
- public override async Task DoPut(IAsyncStreamReader<FlightData>
requestStream, IServerStreamWriter<Protocol.PutResult> responseStream,
ServerCallContext context)
+ public override async Task
DoPut(IAsyncStreamReader<Protocol.FlightData> requestStream,
IServerStreamWriter<Protocol.PutResult> responseStream, ServerCallContext
context)
{
var readStream = new
FlightServerRecordBatchStreamReader(requestStream);
var writeStream = new StreamWriter<FlightPutResult,
Protocol.PutResult>(responseStream, putResult => putResult.ToProtocol());
- await _flightServer.DoPut(readStream, writeStream,
context).ConfigureAwait(false);
+
+ if (_flightServer.SqlProducer != null && await
FlightSqlProducer.GetCommand(readStream).ConfigureAwait(false) is { } command)
Review Comment:
I'm a little confused by this approach. Wouldn't you want to have both a
`FlightServerImplementation` and a `FlightSqlServerImplementation`? Maybe the
second could extend the first?
The `FlightSqlServerImplementation` would then always have
`_flightServer.SqlProducer != null` and you wouldn't need an if check in each
call?
##########
csharp/test/Apache.Arrow.Flight.Sql.Tests/FlightSqlProducerTests.cs:
##########
@@ -0,0 +1,722 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Server;
+using Apache.Arrow.Types;
+using Arrow.Flight.Protocol.Sql;
+using Google.Protobuf;
+using Google.Protobuf.Reflection;
+using Grpc.Core;
+using Xunit;
+
+namespace Apache.Arrow.Flight.Sql.Tests;
+
+public class FlightSqlProducerTests
+{
+ [Theory]
+ [InlineData(FlightDescriptorType.Path, null, null)]
+ [InlineData(FlightDescriptorType.Command, "", null)]
+ [InlineData(FlightDescriptorType.Command,
"CkB0eXBlLmdvb2dsZWFwaXMuY29tL2Fycm93LmZsaWdodC5wcm90b2NvbC5zcWwuQ29tbWFuZEdldENhdGFsb2dz",
typeof(CommandGetCatalogs))]
+ public void EnsureGetCommandReturnsTheCorrectResponse(FlightDescriptorType
type, string? command, Type? expectedResult)
+ {
+ //Given
+ FlightDescriptor descriptor;
+ if (type == FlightDescriptorType.Command)
+ {
+ descriptor = command != null ?
FlightDescriptor.CreateCommandDescriptor(ByteString.FromBase64(command).ToByteArray())
: FlightDescriptor.CreateCommandDescriptor(ByteString.Empty.ToStringUtf8());
+ }
+ else
+ {
+ descriptor =
FlightDescriptor.CreatePathDescriptor(System.Array.Empty<string>());
+ }
+
+ //When
+ var result = FlightSqlProducer.GetCommand(descriptor);
+
+ //Then
+ Assert.Equal(expectedResult, result?.GetType());
+ }
+
+ [Fact]
+ public async Task EnsureTheCorrectActionsAreGiven()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var streamWriter = new MockServerStreamWriter<FlightActionType>();
+
+ //When
+ await producer.ListActions(streamWriter, new
MockServerCallContext()).ConfigureAwait(false);
+ var actions = streamWriter.Messages.ToArray();
+
+ Assert.Equal(FlightSqlUtils.FlightSqlActions, actions);
+ }
+
+ [Theory]
+ [InlineData(false,
+ new[] {"catalog_name", "db_schema_name", "table_name", "table_type"},
+ new[] {typeof(StringType), typeof(StringType), typeof(StringType),
typeof(StringType)},
+ new[] {true, true, false, false})
+ ]
+ [InlineData(true,
+ new[] {"catalog_name", "db_schema_name", "table_name", "table_type",
"table_schema"},
+ new[] {typeof(StringType), typeof(StringType), typeof(StringType),
typeof(StringType), typeof(BinaryType)},
+ new[] {true, true, false, false, false})
+ ]
+ public void EnsureTableSchemaIsCorrectWithoutTableSchema(bool
includeTableSchemaField, string[] expectedNames, Type[] expectedTypes, bool[]
expectedIsNullable)
+ {
+ // Arrange
+
+ // Act
+ var schema = FlightSqlProducer.GetTableSchema(includeTableSchemaField);
+ var fields = schema.FieldsList;
+
+ //Assert
+ Assert.False(schema.HasMetadata);
+ Assert.Equal(expectedNames.Length, fields.Count);
+ for (int i = 0; i < fields.Count; i++)
+ {
+ Assert.Equal(expectedNames[i], fields[i].Name);
+ Assert.Equal(expectedTypes[i], fields[i].DataType.GetType());
+ Assert.Equal(expectedIsNullable[i], fields[i].IsNullable);
+ }
+ }
+
+ #region FlightInfoTests
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandStatementQuery()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandStatementQuery();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetStatementQueryFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandPreparedStatementQuery()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandPreparedStatementQuery();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetPreparedStatementQueryFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCatalogs()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetCatalogs();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetCatalogFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetDbSchemas()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetDbSchemas();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetDbSchemaFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTables()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetTables();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetTablesFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetTableTypes()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetTableTypes();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetTableTypesFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetSqlInfo()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetSqlInfo();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetSqlFlightInfo", flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetPrimaryKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetPrimaryKeys();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetPrimaryKeysFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetExportedKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetExportedKeys();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetExportedKeysFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetImportedKeys()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetImportedKeys();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetImportedKeysFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetCrossReference()
+ {
+ //Given
+ var producer = new TestFlightSqlProducer();
+ var command = new CommandGetCrossReference();
+
+ //When
+ var flightInfo = await producer.GetFlightInfo(command,
FlightDescriptor.CreatePathDescriptor(""), new MockServerCallContext());
+
+ //Then
+ Assert.Equal("GetCrossReferenceFlightInfo",
flightInfo.Descriptor.Paths.First());
+ }
+
+ [Fact]
+ public async void
EnsureGetFlightInfoIsCorrectlyRoutedForTheCommandGetXdbcTypeInfo()
Review Comment:
I wonder if all of these `EnsureGetFlightInfoIsCorrectlyRouted` tests could
be collapsed into a single test parameterized with `InlineData`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]