This is an automated email from the ASF dual-hosted git repository.

brycemecum pushed a commit to branch maint-18.1.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit f42c5b0c240d7275ec28081aa9be71f17a61e3bf
Author: Adam Reeve <[email protected]>
AuthorDate: Wed Oct 16 03:09:41 2024 +1300

    GH-44361: [C#][Integration] Include .NET in Flight integration tests 
(#44377)
    
    ### Rationale for this change
    
    See #44361. This allows testing compatibility of the .NET Flight 
implementation with other Flight implementations.
    
    ### What changes are included in this PR?
    
    * Adds a new `Apache.Arrow.Flight.IntegrationTest` project that can run in 
server or client mode for Flight integration tests.
    * Includes the integration tests that send then retrieve data defined in 
JSON files, but doesn't add any of the named scenarios
    * Configures archery to include C# in the Flight integration tests, but 
skip all the named scenarios
    * Also skips tests that use dictionary data due to #38045, and the empty 
data test due to #44363
    
    ### Are these changes tested?
    
    These changes are tests.
    
    ### Are there any user-facing changes?
    
    No
    * GitHub Issue: #44361
    
    Authored-by: Adam Reeve <[email protected]>
    Signed-off-by: Curt Hagenlocher <[email protected]>
---
 csharp/Apache.Arrow.sln                            |   6 +
 .../FlightRecordBatchStreamWriter.cs               |   2 +-
 .../Apache.Arrow.Flight.IntegrationTest.csproj     |  18 +++
 .../FlightClientCommand.cs                         |  51 +++++++
 .../FlightServerCommand.cs                         |  68 +++++++++
 .../GrpcResolver.cs                                |  34 +++++
 .../JsonTestScenario.cs                            | 167 +++++++++++++++++++++
 .../Apache.Arrow.Flight.IntegrationTest/Program.cs |  67 +++++++++
 csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs |  11 +-
 .../TestFlightServer.cs                            |   8 +-
 dev/archery/archery/integration/datagen.py         |  23 ++-
 dev/archery/archery/integration/runner.py          |   9 +-
 dev/archery/archery/integration/tester_csharp.py   |  57 ++++++-
 dev/release/rat_exclude_files.txt                  |   1 +
 14 files changed, 502 insertions(+), 20 deletions(-)

diff --git a/csharp/Apache.Arrow.sln b/csharp/Apache.Arrow.sln
index 7e7f7c6331..0e569de1d6 100644
--- a/csharp/Apache.Arrow.sln
+++ b/csharp/Apache.Arrow.sln
@@ -27,6 +27,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = 
"Apache.Arrow.Flight.Sql.Tes
 EndProject
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql", 
"src\Apache.Arrow.Flight.Sql\Apache.Arrow.Flight.Sql.csproj", 
"{2ADE087A-B424-4895-8CC5-10170D10BA62}"
 EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = 
"Apache.Arrow.Flight.IntegrationTest", 
"test\Apache.Arrow.Flight.IntegrationTest\Apache.Arrow.Flight.IntegrationTest.csproj",
 "{7E66CBB4-D921-41E7-A98A-7C6DEA521696}"
+EndProject
 Global
        GlobalSection(SolutionConfigurationPlatforms) = preSolution
                Debug|Any CPU = Debug|Any CPU
@@ -81,6 +83,10 @@ Global
                {2ADE087A-B424-4895-8CC5-10170D10BA62}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
                {2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
                {2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.Build.0 
= Release|Any CPU
+               {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.ActiveCfg 
= Debug|Any CPU
+               {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
+               {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
+               {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any CPU.Build.0 
= Release|Any CPU
        EndGlobalSection
        GlobalSection(SolutionProperties) = preSolution
                HideSolutionNode = FALSE
diff --git a/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs 
b/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs
index f76f082245..7a8a6fd677 100644
--- a/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs
+++ b/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs
@@ -64,7 +64,7 @@ namespace Apache.Arrow.Flight
         {
             if (!_disposed)
             {
-                _flightDataStream.Dispose();
+                _flightDataStream?.Dispose();
                 _disposed = true;
             }
         }
diff --git 
a/csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj
 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj
new file mode 100644
index 0000000000..34030621b4
--- /dev/null
+++ 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <OutputType>Exe</OutputType>
+    <TargetFramework>net8.0</TargetFramework>
+    <RootNamespace>Apache.Arrow.Flight.IntegrationTest</RootNamespace>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <PackageReference Include="System.CommandLine" 
Version="2.0.0-beta4.22272.1" />
+    <PackageReference Include="System.Text.Json" Version="8.0.5" />
+    <ProjectReference 
Include="..\..\src\Apache.Arrow.Flight\Apache.Arrow.Flight.csproj" />
+    <ProjectReference 
Include="..\Apache.Arrow.Flight.TestWeb\Apache.Arrow.Flight.TestWeb.csproj" />
+    <ProjectReference 
Include="..\Apache.Arrow.IntegrationTest\Apache.Arrow.IntegrationTest.csproj" />
+  </ItemGroup>
+
+</Project>
diff --git 
a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs
new file mode 100644
index 0000000000..d9e0ff5230
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Flight.IntegrationTest;
+
+public class FlightClientCommand
+{
+    private readonly int _port;
+    private readonly string _scenario;
+    private readonly FileInfo _jsonFileInfo;
+
+    public FlightClientCommand(int port, string scenario, FileInfo 
jsonFileInfo)
+    {
+        _port = port;
+        _scenario = scenario;
+        _jsonFileInfo = jsonFileInfo;
+    }
+
+    public async Task Execute()
+    {
+        if (!string.IsNullOrEmpty(_scenario))
+        {
+            // No named scenarios are currently implemented
+            throw new Exception($"Scenario '{_scenario}' is not supported.");
+        }
+
+        if (!(_jsonFileInfo?.Exists ?? false))
+        {
+            throw new Exception($"Invalid JSON file path 
'{_jsonFileInfo?.FullName}'");
+        }
+
+        var scenario = new JsonTestScenario(_port, _jsonFileInfo);
+        await scenario.RunClient().ConfigureAwait(false);
+    }
+}
diff --git 
a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs
new file mode 100644
index 0000000000..c3a7694485
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Net;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.TestWeb;
+using Microsoft.AspNetCore.Hosting;
+using Microsoft.AspNetCore.Hosting.Server;
+using Microsoft.AspNetCore.Hosting.Server.Features;
+using Microsoft.AspNetCore.Server.Kestrel.Core;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+
+namespace Apache.Arrow.Flight.IntegrationTest;
+
+public class FlightServerCommand
+{
+    private readonly string _scenario;
+
+    public FlightServerCommand(string scenario)
+    {
+        _scenario = scenario;
+    }
+
+    public async Task Execute()
+    {
+        if (!string.IsNullOrEmpty(_scenario))
+        {
+            // No named scenarios are currently implemented
+            throw new Exception($"Scenario '{_scenario}' is not supported.");
+        }
+
+        var host = Host.CreateDefaultBuilder()
+            .ConfigureWebHostDefaults(webBuilder =>
+            {
+                webBuilder
+                    .ConfigureKestrel(options =>
+                    {
+                        options.Listen(IPEndPoint.Parse("127.0.0.1:0"), l => 
l.Protocols = HttpProtocols.Http2);
+                    })
+                    .UseStartup<Startup>();
+            })
+            .Build();
+
+        await host.StartAsync().ConfigureAwait(false);
+
+        var addresses = 
host.Services.GetService<IServer>().Features.Get<IServerAddressesFeature>().Addresses;
+        foreach (var address in addresses)
+        {
+            Console.WriteLine($"Server listening on {address}");
+        }
+
+        await host.WaitForShutdownAsync().ConfigureAwait(false);
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs
new file mode 100644
index 0000000000..44b1075e7a
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Grpc.Net.Client.Balancer;
+
+namespace Apache.Arrow.Flight.IntegrationTest;
+
+/// <summary>
+/// The Grpc.Net.Client library doesn't know how to handle the "grpc+tcp" 
scheme used by Arrow Flight.
+/// This ResolverFactory passes these through to the standard Static Resolver 
used for the http scheme.
+/// </summary>
+public class GrpcTcpResolverFactory : ResolverFactory
+{
+    public override string Name => "grpc+tcp";
+
+    public override Resolver Create(ResolverOptions options)
+    {
+        return new StaticResolverFactory(
+                uri => new[] { new BalancerAddress(options.Address.Host, 
options.Address.Port) })
+            .Create(options);
+    }
+}
diff --git 
a/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs
new file mode 100644
index 0000000000..f4f3ac28bf
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Client;
+using Apache.Arrow.IntegrationTest;
+using Apache.Arrow.Tests;
+using Apache.Arrow.Types;
+using Google.Protobuf;
+using Grpc.Net.Client;
+using Grpc.Core;
+using Grpc.Net.Client.Balancer;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Apache.Arrow.Flight.IntegrationTest;
+
+/// <summary>
+/// A test scenario defined using a JSON data file
+/// </summary>
+internal class JsonTestScenario
+{
+    private readonly int _serverPort;
+    private readonly FileInfo _jsonFile;
+    private readonly ServiceProvider _serviceProvider;
+
+    public JsonTestScenario(int serverPort, FileInfo jsonFile)
+    {
+        _serverPort = serverPort;
+        _jsonFile = jsonFile;
+
+        var services = new ServiceCollection();
+        services.AddSingleton<ResolverFactory>(new GrpcTcpResolverFactory());
+        _serviceProvider = services.BuildServiceProvider();
+    }
+
+    public async Task RunClient()
+    {
+        var address = $"grpc+tcp://localhost:{_serverPort}";
+        using var channel = GrpcChannel.ForAddress(
+            address,
+            new GrpcChannelOptions
+            {
+                ServiceProvider = _serviceProvider,
+                Credentials = ChannelCredentials.Insecure
+            });
+        var client = new FlightClient(channel);
+
+        var descriptor = 
FlightDescriptor.CreatePathDescriptor(_jsonFile.FullName);
+
+        var jsonFile = await 
JsonFile.ParseAsync(_jsonFile).ConfigureAwait(false);
+        var schema = jsonFile.GetSchemaAndDictionaries(out 
Func<DictionaryType, IArrowArray> dictionaries);
+        var batches = jsonFile.Batches.Select(batch => batch.ToArrow(schema, 
dictionaries)).ToArray();
+
+        // 1. Put the data to the server.
+        await UploadBatches(client, descriptor, batches).ConfigureAwait(false);
+
+        // 2. Get the ticket for the data.
+        var info = await client.GetInfo(descriptor).ConfigureAwait(false);
+        if (info.Endpoints.Count == 0)
+        {
+            throw new Exception("No endpoints received");
+        }
+
+        // 3. Stream data from the server, comparing individual batches.
+        foreach (var endpoint in info.Endpoints)
+        {
+            var locations = endpoint.Locations.ToArray();
+            if (locations.Length == 0)
+            {
+                // Can read with existing client
+                await ConsumeFlightLocation(client, endpoint.Ticket, 
batches).ConfigureAwait(false);
+            }
+            else
+            {
+                foreach (var location in locations)
+                {
+                    using var readChannel = GrpcChannel.ForAddress(
+                        location.Uri,
+                        new GrpcChannelOptions
+                        {
+                            ServiceProvider = _serviceProvider,
+                            Credentials = ChannelCredentials.Insecure
+                        });
+                    var readClient = new FlightClient(readChannel);
+                    await ConsumeFlightLocation(readClient, endpoint.Ticket, 
batches).ConfigureAwait(false);
+                }
+            }
+        }
+    }
+
+    private static async Task UploadBatches(FlightClient client, 
FlightDescriptor descriptor, RecordBatch[] batches)
+    {
+        using var putCall = client.StartPut(descriptor);
+        using var writer = putCall.RequestStream;
+
+        try
+        {
+            var counter = 0;
+            foreach (var batch in batches)
+            {
+                var metadata = $"{counter}";
+
+                await writer.WriteAsync(batch, 
ByteString.CopyFromUtf8(metadata)).ConfigureAwait(false);
+
+                // Verify server has acknowledged the write request
+                await putCall.ResponseStream.MoveNext().ConfigureAwait(false);
+                var responseString = 
putCall.ResponseStream.Current.ApplicationMetadata.ToStringUtf8();
+
+                if (responseString != metadata)
+                {
+                    throw new Exception($"Response metadata '{responseString}' 
does not match expected metadata '{metadata}'");
+                }
+
+                counter++;
+            }
+        }
+        finally
+        {
+            await writer.CompleteAsync().ConfigureAwait(false);
+        }
+
+        // Drain the response stream to ensure the server has stored the data
+        var hasMore = await 
putCall.ResponseStream.MoveNext().ConfigureAwait(false);
+        if (hasMore)
+        {
+            throw new Exception("Expected to have reached the end of the 
response stream");
+        }
+    }
+
+    private static async Task ConsumeFlightLocation(FlightClient client, 
FlightTicket ticket, RecordBatch[] batches)
+    {
+        using var readStream = client.GetStream(ticket);
+        var counter = 0;
+        foreach (var originalBatch in batches)
+        {
+            if (!await 
readStream.ResponseStream.MoveNext().ConfigureAwait(false))
+            {
+                throw new Exception($"Expected {batches.Length} batches but 
received {counter}");
+            }
+
+            var batch = readStream.ResponseStream.Current;
+            ArrowReaderVerifier.CompareBatches(originalBatch, batch, 
strictCompare: false);
+
+            counter++;
+        }
+
+        if (await readStream.ResponseStream.MoveNext().ConfigureAwait(false))
+        {
+            throw new Exception($"Expected to reach the end of the response 
stream after {batches.Length} batches");
+        }
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/Program.cs 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Program.cs
new file mode 100644
index 0000000000..24d39de28a
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Program.cs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System.CommandLine;
+using System.IO;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Flight.IntegrationTest;
+
+public static class Program
+{
+    public static async Task<int> Main(string[] args)
+    {
+        var portOption = new Option<int>(
+            new[] { "--port", "-p" },
+            description: "Port the Flight server is listening on");
+        var scenarioOption = new Option<string>(
+            new[] { "--scenario", "-s" },
+            "The name of the scenario to run");
+        var pathOption = new Option<FileInfo>(
+            new[] { "--path", "-j" },
+            "Path to a JSON file of test data");
+
+        var rootCommand = new RootCommand(
+            "Integration test application for Apache.Arrow .NET Flight.");
+
+        var clientCommand = new Command("client", "Run the Flight client")
+        {
+            portOption,
+            scenarioOption,
+            pathOption,
+        };
+        rootCommand.AddCommand(clientCommand);
+
+        clientCommand.SetHandler(async (port, scenario, jsonFile) =>
+        {
+            var command = new FlightClientCommand(port, scenario, jsonFile);
+            await command.Execute().ConfigureAwait(false);
+        }, portOption, scenarioOption, pathOption);
+
+        var serverCommand = new Command("server", "Run the Flight server")
+        {
+            scenarioOption,
+        };
+        rootCommand.AddCommand(serverCommand);
+
+        serverCommand.SetHandler(async scenario =>
+        {
+            var command = new FlightServerCommand(scenario);
+            await command.Execute().ConfigureAwait(false);
+        }, scenarioOption);
+
+        return await rootCommand.InvokeAsync(args).ConfigureAwait(false);
+    }
+}
diff --git a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs 
b/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs
index 97c1af2f06..d1cfe9e445 100644
--- a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs
+++ b/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs
@@ -13,15 +13,13 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
 using Microsoft.AspNetCore.Builder;
 using Microsoft.AspNetCore.Hosting;
 using Microsoft.AspNetCore.Http;
 using Microsoft.Extensions.DependencyInjection;
 using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Console;
 
 namespace Apache.Arrow.Flight.TestWeb
 {
@@ -35,6 +33,11 @@ namespace Apache.Arrow.Flight.TestWeb
                 .AddFlightServer<TestFlightServer>();
 
             services.AddSingleton(new FlightStore());
+
+            // The integration tests rely on the port being written to the 
first line of stdout,
+            // so send all logging to stderr.
+            services.Configure<ConsoleLoggerOptions>(
+                o => o.LogToStandardErrorThreshold = LogLevel.Debug);
         }
 
         // This method gets called by the runtime. Use this method to 
configure the HTTP request pipeline.
diff --git a/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs 
b/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs
index 4a72b73274..46c5460912 100644
--- a/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs
+++ b/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs
@@ -67,14 +67,16 @@ namespace Apache.Arrow.Flight.TestWeb
 
             if(!_flightStore.Flights.TryGetValue(flightDescriptor, out var 
flightHolder))
             {
-                flightHolder = new FlightHolder(flightDescriptor, await 
requestStream.Schema, $"http://{context.Host}";);
+                flightHolder = new FlightHolder(flightDescriptor, await 
requestStream.Schema, $"grpc+tcp://{context.Host}");
                 _flightStore.Flights.Add(flightDescriptor, flightHolder);
             }
 
             while (await requestStream.MoveNext())
             {
-                flightHolder.AddBatch(new 
RecordBatchWithMetadata(requestStream.Current, 
requestStream.ApplicationMetadata.FirstOrDefault()));
-                await responseStream.WriteAsync(FlightPutResult.Empty);
+                var applicationMetadata = 
requestStream.ApplicationMetadata.FirstOrDefault();
+                flightHolder.AddBatch(new 
RecordBatchWithMetadata(requestStream.Current, applicationMetadata));
+                await responseStream.WriteAsync(
+                    applicationMetadata == null ? FlightPutResult.Empty : new 
FlightPutResult(applicationMetadata));
             }
         }
 
diff --git a/dev/archery/archery/integration/datagen.py 
b/dev/archery/archery/integration/datagen.py
index 9f86d172dd..bc86296340 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -25,7 +25,7 @@ import tempfile
 import numpy as np
 
 from .util import frombytes, tobytes, random_bytes, random_utf8
-from .util import SKIP_C_SCHEMA, SKIP_C_ARRAY
+from .util import SKIP_C_SCHEMA, SKIP_C_ARRAY, SKIP_FLIGHT
 
 
 def metadata_key_values(pairs):
@@ -1890,7 +1890,10 @@ def get_generated_json_files(tempdir=None):
         return
 
     file_objs = [
-        generate_primitive_case([], name='primitive_no_batches'),
+        generate_primitive_case([], name='primitive_no_batches')
+        # TODO(https://github.com/apache/arrow/issues/44363)
+        .skip_format(SKIP_FLIGHT, 'C#'),
+
         generate_primitive_case([17, 20], name='primitive'),
         generate_primitive_case([0, 0, 0], name='primitive_zerolength'),
 
@@ -1952,16 +1955,22 @@ def get_generated_json_files(tempdir=None):
 
         generate_dictionary_case()
         # TODO(https://github.com/apache/arrow-nanoarrow/issues/622)
-        .skip_tester('nanoarrow'),
+        .skip_tester('nanoarrow')
+        # TODO(https://github.com/apache/arrow/issues/38045)
+        .skip_format(SKIP_FLIGHT, 'C#'),
 
         generate_dictionary_unsigned_case()
         .skip_tester('nanoarrow')
-        .skip_tester('Java'),  # TODO(ARROW-9377)
+        .skip_tester('Java')  # TODO(ARROW-9377)
+        # TODO(https://github.com/apache/arrow/issues/38045)
+        .skip_format(SKIP_FLIGHT, 'C#'),
 
         generate_nested_dictionary_case()
         # TODO(https://github.com/apache/arrow-nanoarrow/issues/622)
         .skip_tester('nanoarrow')
-        .skip_tester('Java'),  # TODO(ARROW-7779)
+        .skip_tester('Java')  # TODO(ARROW-7779)
+        # TODO(https://github.com/apache/arrow/issues/38045)
+        .skip_format(SKIP_FLIGHT, 'C#'),
 
         generate_run_end_encoded_case()
         .skip_tester('C#')
@@ -1988,7 +1997,9 @@ def get_generated_json_files(tempdir=None):
         .skip_tester('nanoarrow')
         # TODO: ensure the extension is registered in the C++ entrypoint
         .skip_format(SKIP_C_SCHEMA, 'C++')
-        .skip_format(SKIP_C_ARRAY, 'C++'),
+        .skip_format(SKIP_C_ARRAY, 'C++')
+        # TODO(https://github.com/apache/arrow/issues/38045)
+        .skip_format(SKIP_FLIGHT, 'C#'),
     ]
 
     generated_paths = []
diff --git a/dev/archery/archery/integration/runner.py 
b/dev/archery/archery/integration/runner.py
index e276738846..378b17d75f 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -631,10 +631,13 @@ def run_all_tests(with_cpp=True, with_java=True, 
with_js=True,
     flight_scenarios = [
         Scenario(
             "auth:basic_proto",
-            description="Authenticate using the BasicAuth protobuf."),
+            description="Authenticate using the BasicAuth protobuf.",
+            skip_testers={"C#"},
+        ),
         Scenario(
             "middleware",
             description="Ensure headers are propagated via middleware.",
+            skip_testers={"C#"},
         ),
         Scenario(
             "ordered",
@@ -689,12 +692,12 @@ def run_all_tests(with_cpp=True, with_java=True, 
with_js=True,
         Scenario(
             "flight_sql",
             description="Ensure Flight SQL protocol is working as expected.",
-            skip_testers={"Rust"}
+            skip_testers={"Rust", "C#"}
         ),
         Scenario(
             "flight_sql:extension",
             description="Ensure Flight SQL extensions work as expected.",
-            skip_testers={"Rust"}
+            skip_testers={"Rust", "C#"}
         ),
         Scenario(
             "flight_sql:ingestion",
diff --git a/dev/archery/archery/integration/tester_csharp.py 
b/dev/archery/archery/integration/tester_csharp.py
index 02ced0701d..50b3499fbf 100644
--- a/dev/archery/archery/integration/tester_csharp.py
+++ b/dev/archery/archery/integration/tester_csharp.py
@@ -17,6 +17,7 @@
 
 from contextlib import contextmanager
 import os
+import subprocess
 
 from . import cdata
 from .tester import Tester, CDataExporter, CDataImporter
@@ -25,12 +26,20 @@ from ..utils.source import ARROW_ROOT_DEFAULT
 
 
 _ARTIFACTS_PATH = os.path.join(ARROW_ROOT_DEFAULT, "csharp/artifacts")
+_BUILD_SUBDIR = "Debug/net8.0"
 
 _EXE_PATH = os.path.join(_ARTIFACTS_PATH,
                          "Apache.Arrow.IntegrationTest",
-                         "Debug/net8.0/Apache.Arrow.IntegrationTest",
+                         _BUILD_SUBDIR,
+                         "Apache.Arrow.IntegrationTest",
                          )
 
+_FLIGHT_EXE_PATH = os.path.join(_ARTIFACTS_PATH,
+                                "Apache.Arrow.Flight.IntegrationTest",
+                                _BUILD_SUBDIR,
+                                "Apache.Arrow.Flight.IntegrationTest",
+                                )
+
 _clr_loaded = False
 
 
@@ -44,10 +53,10 @@ def _load_clr():
         import clr
         clr.AddReference(
             f"{_ARTIFACTS_PATH}/Apache.Arrow.IntegrationTest/"
-            f"Debug/net8.0/Apache.Arrow.IntegrationTest.dll")
+            f"{_BUILD_SUBDIR}/Apache.Arrow.IntegrationTest.dll")
         clr.AddReference(
             f"{_ARTIFACTS_PATH}/Apache.Arrow.Tests/"
-            f"Debug/net8.0/Apache.Arrow.Tests.dll")
+            f"{_BUILD_SUBDIR}/Apache.Arrow.Tests.dll")
 
         from Apache.Arrow.IntegrationTest import CDataInterface
         CDataInterface.Initialize()
@@ -146,6 +155,8 @@ class CSharpCDataImporter(CDataImporter, _CDataBase):
 class CSharpTester(Tester):
     PRODUCER = True
     CONSUMER = True
+    FLIGHT_SERVER = True
+    FLIGHT_CLIENT = True
     C_DATA_SCHEMA_EXPORTER = True
     C_DATA_SCHEMA_IMPORTER = True
     C_DATA_ARRAY_EXPORTER = True
@@ -192,3 +203,43 @@ class CSharpTester(Tester):
 
     def make_c_data_importer(self):
         return CSharpCDataImporter(self.debug, self.args)
+
+    def flight_request(self, port, json_path=None, scenario_name=None):
+        cmd = [_FLIGHT_EXE_PATH, 'client', '--port', f'{port}']
+        if json_path:
+            cmd.extend(['--path', json_path])
+        elif scenario_name:
+            cmd.extend(['--scenario', scenario_name])
+        else:
+            raise TypeError("Must provide one of json_path or scenario_name")
+
+        if self.debug:
+            log(' '.join(cmd))
+        run_cmd(cmd)
+
+    @contextmanager
+    def flight_server(self, scenario_name=None):
+        cmd = [_FLIGHT_EXE_PATH, 'server']
+        if scenario_name:
+            cmd.extend(['--scenario', scenario_name])
+        if self.debug:
+            log(' '.join(cmd))
+        server = subprocess.Popen(
+            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+        try:
+            output = server.stdout.readline().decode()
+            if not output.startswith("Server listening on "):
+                server.kill()
+                out, err = server.communicate()
+                raise RuntimeError(
+                    '.NET Flight server did not start properly, '
+                    'stdout: \n{}\n\nstderr:\n{}\n'.format(
+                        output + out.decode(), err.decode()
+                    )
+                )
+            port = int(output.split(':')[-1])
+            yield port
+        finally:
+            server.kill()
+            server.wait(5)
diff --git a/dev/release/rat_exclude_files.txt 
b/dev/release/rat_exclude_files.txt
index e149c17981..dda1d36dc1 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -116,6 +116,7 @@ 
csharp/src/Apache.Arrow.Flight.AspNetCore/Apache.Arrow.Flight.AspNetCore.csproj
 csharp/src/Apache.Arrow.Compression/Apache.Arrow.Compression.csproj
 csharp/src/Apache.Arrow.Flight.Sql/Apache.Arrow.Flight.Sql.csproj
 csharp/test/Apache.Arrow.Benchmarks/Apache.Arrow.Benchmarks.csproj
+csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj
 csharp/test/Apache.Arrow.Flight.Tests/Apache.Arrow.Flight.Tests.csproj
 csharp/test/Apache.Arrow.Flight.Sql.Tests/Apache.Arrow.Flight.Sql.Tests.csproj
 csharp/test/Apache.Arrow.Flight.TestWeb/Apache.Arrow.Flight.TestWeb.csproj

Reply via email to