This is an automated email from the ASF dual-hosted git repository.

brycemecum pushed a commit to branch maint-18.1.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit a803dd9df691f7816239ef3813bc507d87ca5ed9
Author: Adam Reeve <[email protected]>
AuthorDate: Wed Oct 16 13:33:23 2024 +1300

    GH-44360: [C#] Fix Flight DoExchange incompatibility with C++ 
implementation (#44424)
    
    ### Rationale for this change
    
    See #44360
    
    ### What changes are included in this PR?
    
    * Adds a new integration test to allow testing `do_exchange` between 
C++/Python and .NET.
    * Updates the Flight stream reader to handle when a descriptor is sent in 
the first message without any schema.
    
    ### Are these changes tested?
    
    * Yes, using the new integration test.
    
    ### Are there any user-facing changes?
    
    No
    * GitHub Issue: #44360
    
    Authored-by: Adam Reeve <[email protected]>
    Signed-off-by: Curt Hagenlocher <[email protected]>
---
 .../flight/integration_tests/test_integration.cc   | 129 +++++++++++++++++++++
 .../Internal/RecordBatchReaderImplementation.cs    |  61 +++++-----
 .../FlightClientCommand.cs                         |  18 ++-
 .../FlightServerCommand.cs                         |  33 +++++-
 .../IScenario.cs                                   |  35 ++++++
 .../Scenarios/DoExchangeEchoScenario.cs            | 122 +++++++++++++++++++
 .../{ => Scenarios}/JsonTestScenario.cs            |  23 ++--
 .../Startup.cs                                     |  20 +---
 csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs |   7 --
 .../test/Apache.Arrow.Flight.Tests/FlightTests.cs  |   1 -
 dev/archery/archery/integration/runner.py          |   6 +
 docs/source/status.rst                             |  19 ++-
 12 files changed, 385 insertions(+), 89 deletions(-)

diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc 
b/cpp/src/arrow/flight/integration_tests/test_integration.cc
index da6fcf81eb..f38076822c 100644
--- a/cpp/src/arrow/flight/integration_tests/test_integration.cc
+++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc
@@ -19,6 +19,7 @@
 
 #include <iostream>
 #include <memory>
+#include <numeric>
 #include <string>
 #include <unordered_map>
 #include <utility>
@@ -1026,6 +1027,131 @@ class AppMetadataFlightInfoEndpointScenario : public 
Scenario {
   }
 };
 
+/// \brief The server used for testing do_exchange
+class DoExchangeServer : public FlightServerBase {
+ public:
+  DoExchangeServer() : FlightServerBase() {}
+
+  Status DoExchange(const ServerCallContext& context,
+                    std::unique_ptr<FlightMessageReader> reader,
+                    std::unique_ptr<FlightMessageWriter> writer) override {
+    if (reader->descriptor().type != FlightDescriptor::DescriptorType::CMD) {
+      return Status::Invalid("Must provide a command descriptor");
+    }
+
+    const std::string& cmd = reader->descriptor().cmd;
+    if (cmd == "echo") {
+      return RunEchoExchange(reader, writer);
+    } else {
+      return Status::NotImplemented("Command not implemented: ", cmd);
+    }
+  }
+
+ private:
+  static Status RunEchoExchange(std::unique_ptr<FlightMessageReader>& reader,
+                                std::unique_ptr<FlightMessageWriter>& writer) {
+    FlightStreamChunk chunk;
+    bool begun = false;
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(chunk, reader->Next());
+      if (!chunk.data && !chunk.app_metadata) {
+        break;
+      }
+      if (!begun && chunk.data) {
+        begun = true;
+        RETURN_NOT_OK(writer->Begin(chunk.data->schema()));
+      }
+      if (chunk.data && chunk.app_metadata) {
+        RETURN_NOT_OK(writer->WriteWithMetadata(*chunk.data, 
chunk.app_metadata));
+      } else if (chunk.data) {
+        RETURN_NOT_OK(writer->WriteRecordBatch(*chunk.data));
+      } else if (chunk.app_metadata) {
+        RETURN_NOT_OK(writer->WriteMetadata(chunk.app_metadata));
+      }
+    }
+    return Status::OK();
+  }
+};
+
+/// \brief The DoExchangeEcho scenario.
+///
+/// This tests that the client and server can perform a two-way data exchange.
+///
+/// The server should echo back any data sent by the client.
+class DoExchangeEchoScenario : public Scenario {
+  Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+                    FlightServerOptions* options) override {
+    *server = std::make_unique<DoExchangeServer>();
+    return Status::OK();
+  }
+
+  Status MakeClient(FlightClientOptions* options) override { return 
Status::OK(); }
+
+  Status RunClient(std::unique_ptr<FlightClient> client) override {
+    auto descriptor = FlightDescriptor::Command("echo");
+    FlightCallOptions call_options;
+
+    ARROW_ASSIGN_OR_RAISE(auto do_exchange_result,
+                          client->DoExchange(call_options, descriptor));
+    std::unique_ptr<FlightStreamWriter> writer = 
std::move(do_exchange_result.writer);
+    std::unique_ptr<FlightStreamReader> reader = 
std::move(do_exchange_result.reader);
+
+    auto schema = arrow::schema({field("x", int32(), false)});
+    ARROW_RETURN_NOT_OK(writer->Begin(schema));
+
+    ARROW_ASSIGN_OR_RAISE(auto builder,
+                          RecordBatchBuilder::Make(schema, 
arrow::default_memory_pool()));
+
+    for (int batch_idx = 0; batch_idx < 4; ++batch_idx) {
+      auto int_builder = builder->GetFieldAs<Int32Builder>(0);
+      std::vector<int32_t> batch_data(10);
+      std::iota(batch_data.begin(), batch_data.end(), batch_idx);
+      ARROW_RETURN_NOT_OK(int_builder->AppendValues(batch_data));
+      ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());
+
+      std::string app_metadata = std::to_string(batch_idx);
+      bool write_metadata = batch_idx % 2 == 0;
+
+      if (write_metadata) {
+        ARROW_RETURN_NOT_OK(
+            writer->WriteWithMetadata(*record_batch, 
Buffer::FromString(app_metadata)));
+      } else {
+        ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto read_result, reader->Next());
+      if (read_result.data == nullptr) {
+        return Status::Invalid("Received null data");
+      }
+      if (!read_result.data->Equals(*record_batch)) {
+        return Status::Invalid("Read data doesn't match expected data for 
batch ",
+                               std::to_string(batch_idx), ".\n", "Expected:\n",
+                               record_batch->ToString(), "Actual:\n",
+                               read_result.data->ToString());
+      }
+
+      if (write_metadata) {
+        if (read_result.app_metadata == nullptr) {
+          return Status::Invalid("Received null app metadata");
+        }
+        if (read_result.app_metadata->ToString() != app_metadata) {
+          return Status::Invalid("Read metadata doesn't match expected for 
batch ",
+                                 std::to_string(batch_idx), ".\n", 
"Expected:\n",
+                                 app_metadata, "\nActual:\n",
+                                 read_result.app_metadata->ToString());
+        }
+      } else if (read_result.app_metadata != nullptr) {
+        return Status::Invalid("Expected no app metadata but received non-null 
metadata");
+      }
+    }
+
+    ARROW_RETURN_NOT_OK(writer->DoneWriting());
+    ARROW_RETURN_NOT_OK(writer->Close());
+
+    return Status::OK();
+  }
+};
+
 /// \brief Schema to be returned for mocking the statement/prepared statement 
results.
 ///
 /// Must be the same across all languages.
@@ -2283,6 +2409,9 @@ Status GetScenario(const std::string& scenario_name, 
std::shared_ptr<Scenario>*
   } else if (scenario_name == "app_metadata_flight_info_endpoint") {
     *out = std::make_shared<AppMetadataFlightInfoEndpointScenario>();
     return Status::OK();
+  } else if (scenario_name == "do_exchange:echo") {
+    *out = std::make_shared<DoExchangeEchoScenario>();
+    return Status::OK();
   } else if (scenario_name == "flight_sql") {
     *out = std::make_shared<FlightSqlScenario>();
     return Status::OK();
diff --git 
a/csharp/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs 
b/csharp/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs
index 99876bf769..22d0bd84fe 100644
--- a/csharp/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs
+++ b/csharp/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs
@@ -69,42 +69,43 @@ namespace Apache.Arrow.Flight.Internal
 
         public override async ValueTask ReadSchemaAsync(CancellationToken 
cancellationToken)
         {
-            if (HasReadSchema)
+            while (!HasReadSchema)
             {
-                return;
-            }
-
-            var moveNextResult = await 
_flightDataStream.MoveNext(cancellationToken).ConfigureAwait(false);
-
-            if (!moveNextResult)
-            {
-                throw new Exception("No records or schema in this flight");
-            }
+                var moveNextResult = await 
_flightDataStream.MoveNext(cancellationToken).ConfigureAwait(false);
+                if (!moveNextResult)
+                {
+                    throw new Exception("No records or schema in this flight");
+                }
 
-            //AppMetadata will never be null, but length 0 if empty
-            //Those are skipped
-            if(_flightDataStream.Current.AppMetadata.Length > 0)
-            {
-                
_applicationMetadatas.Add(_flightDataStream.Current.AppMetadata);
-            }
+                if (_flightDescriptor == null && 
_flightDataStream.Current.FlightDescriptor != null)
+                {
+                    _flightDescriptor = new 
FlightDescriptor(_flightDataStream.Current.FlightDescriptor);
+                }
 
-            var header = _flightDataStream.Current.DataHeader.Memory;
-            Message message = Message.GetRootAsMessage(
-                ArrowReaderImplementation.CreateByteBuffer(header));
+                // AppMetadata will never be null, but length 0 if empty
+                // Those are skipped
+                if(_flightDataStream.Current.AppMetadata.Length > 0)
+                {
+                    
_applicationMetadatas.Add(_flightDataStream.Current.AppMetadata);
+                }
 
+                var header = _flightDataStream.Current.DataHeader.Memory;
+                if (header.IsEmpty)
+                {
+                    // Clients may send a first message with a descriptor only 
and no schema
+                    continue;
+                }
 
-            if(_flightDataStream.Current.FlightDescriptor != null)
-            {
-                _flightDescriptor = new 
FlightDescriptor(_flightDataStream.Current.FlightDescriptor);
-            }
+                Message message = 
Message.GetRootAsMessage(ArrowReaderImplementation.CreateByteBuffer(header));
 
-            switch (message.HeaderType)
-            {
-                case MessageHeader.Schema:
-                    _schema = 
FlightMessageSerializer.DecodeSchema(message.ByteBuffer);
-                    break;
-                default:
-                    throw new Exception($"Expected schema as the first 
message, but got: {message.HeaderType.ToString()}");
+                switch (message.HeaderType)
+                {
+                    case MessageHeader.Schema:
+                        _schema = 
FlightMessageSerializer.DecodeSchema(message.ByteBuffer);
+                        break;
+                    default:
+                        throw new Exception($"Expected schema as the first 
message, but got: {message.HeaderType.ToString()}");
+                }
             }
         }
 
diff --git 
a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs
index d9e0ff5230..a26bcf07ec 100644
--- a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs
@@ -16,6 +16,7 @@
 using System;
 using System.IO;
 using System.Threading.Tasks;
+using Apache.Arrow.Flight.IntegrationTest.Scenarios;
 
 namespace Apache.Arrow.Flight.IntegrationTest;
 
@@ -34,18 +35,13 @@ public class FlightClientCommand
 
     public async Task Execute()
     {
-        if (!string.IsNullOrEmpty(_scenario))
+        IScenario scenario = _scenario switch
         {
-            // No named scenarios are currently implemented
-            throw new Exception($"Scenario '{_scenario}' is not supported.");
-        }
+            null => new JsonTestScenario(_jsonFileInfo),
+            "do_exchange:echo" => new DoExchangeEchoScenario(),
+            _ => throw new NotSupportedException($"Scenario '{_scenario}' is 
not supported"),
+        };
 
-        if (!(_jsonFileInfo?.Exists ?? false))
-        {
-            throw new Exception($"Invalid JSON file path 
'{_jsonFileInfo?.FullName}'");
-        }
-
-        var scenario = new JsonTestScenario(_port, _jsonFileInfo);
-        await scenario.RunClient().ConfigureAwait(false);
+        await scenario.RunClient(_port).ConfigureAwait(false);
     }
 }
diff --git 
a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs
index c3a7694485..38f14b7899 100644
--- a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs
@@ -16,6 +16,8 @@
 using System;
 using System.Net;
 using System.Threading.Tasks;
+using Apache.Arrow.Flight.IntegrationTest.Scenarios;
+using Apache.Arrow.Flight.Server;
 using Apache.Arrow.Flight.TestWeb;
 using Microsoft.AspNetCore.Hosting;
 using Microsoft.AspNetCore.Hosting.Server;
@@ -23,6 +25,8 @@ using Microsoft.AspNetCore.Hosting.Server.Features;
 using Microsoft.AspNetCore.Server.Kestrel.Core;
 using Microsoft.Extensions.DependencyInjection;
 using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Console;
 
 namespace Apache.Arrow.Flight.IntegrationTest;
 
@@ -37,11 +41,12 @@ public class FlightServerCommand
 
     public async Task Execute()
     {
-        if (!string.IsNullOrEmpty(_scenario))
+        IScenario scenario = _scenario switch
         {
-            // No named scenarios are currently implemented
-            throw new Exception($"Scenario '{_scenario}' is not supported.");
-        }
+            null => null,
+            "do_exchange:echo" => new DoExchangeEchoScenario(),
+            _ => throw new NotSupportedException($"Scenario {_scenario} is not 
supported")
+        };
 
         var host = Host.CreateDefaultBuilder()
             .ConfigureWebHostDefaults(webBuilder =>
@@ -51,6 +56,26 @@ public class FlightServerCommand
                     {
                         options.Listen(IPEndPoint.Parse("127.0.0.1:0"), l => 
l.Protocols = HttpProtocols.Http2);
                     })
+                    .ConfigureServices(services =>
+                    {
+                        if (scenario == null)
+                        {
+                            // Use the TestFlightServer for JSON based 
integration tests
+                            
services.AddGrpc().AddFlightServer<TestFlightServer>();
+                            services.AddSingleton(new FlightStore());
+                        }
+                        else
+                        {
+                            // Use a scenario-specific server implementation
+                            
services.AddGrpc().Services.AddScoped<FlightServer>(_ => scenario.MakeServer());
+                        }
+
+                        // The integration tests rely on the port being 
written to the first line of stdout,
+                        // so send all logging to stderr.
+                        services.Configure<ConsoleLoggerOptions>(
+                            o => o.LogToStandardErrorThreshold = 
LogLevel.Debug);
+
+                    })
                     .UseStartup<Startup>();
             })
             .Build();
diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/IScenario.cs 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/IScenario.cs
new file mode 100644
index 0000000000..41ed631f33
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/IScenario.cs
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Server;
+
+namespace Apache.Arrow.Flight.IntegrationTest;
+
+/// <summary>
+/// A Flight integration test scenario
+/// </summary>
+internal interface IScenario
+{
+    /// <summary>
+    /// Create a FlightServer instance to run the scenario
+    /// </summary>
+    FlightServer MakeServer();
+
+    /// <summary>
+    /// Run the scenario using a Flight client
+    /// </summary>
+    Task RunClient(int serverPort);
+}
diff --git 
a/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/DoExchangeEchoScenario.cs
 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/DoExchangeEchoScenario.cs
new file mode 100644
index 0000000000..6e9b2696bb
--- /dev/null
+++ 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/DoExchangeEchoScenario.cs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Client;
+using Apache.Arrow.Flight.Server;
+using Google.Protobuf;
+using Grpc.Core;
+using Grpc.Net.Client;
+using Grpc.Net.Client.Balancer;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Apache.Arrow.Flight.IntegrationTest.Scenarios;
+
+internal class DoExchangeServer : FlightServer
+{
+    public override async Task DoExchange(
+        FlightServerRecordBatchStreamReader requestStream,
+        FlightServerRecordBatchStreamWriter responseStream,
+        ServerCallContext context)
+    {
+        var descriptor = await requestStream.FlightDescriptor;
+        var command = descriptor.Command?.ToStringUtf8();
+        if (command != "echo")
+        {
+            throw new Exception($"Unsupported command: '{command}'");
+        }
+
+        while (await requestStream.MoveNext())
+        {
+            await responseStream.WriteAsync(
+                requestStream.Current, 
requestStream.ApplicationMetadata.FirstOrDefault());
+        }
+    }
+}
+
+internal class DoExchangeEchoScenario : IScenario
+{
+    public FlightServer MakeServer() => new DoExchangeServer();
+
+    public async Task RunClient(int serverPort)
+    {
+        var services = new ServiceCollection();
+        services.AddSingleton<ResolverFactory>(new GrpcTcpResolverFactory());
+        var serviceProvider = services.BuildServiceProvider();
+
+        var address = $"grpc+tcp://localhost:{serverPort}";
+        using var channel = GrpcChannel.ForAddress(
+            address,
+            new GrpcChannelOptions
+            {
+                ServiceProvider = serviceProvider,
+                Credentials = ChannelCredentials.Insecure
+            });
+
+        var client = new FlightClient(channel);
+        var descriptor = FlightDescriptor.CreateCommandDescriptor("echo");
+        using var exchange = client.DoExchange(descriptor);
+
+        using var writer = exchange.RequestStream;
+        using var reader = exchange.ResponseStream;
+
+        for (var batchIdx = 0; batchIdx < 4; batchIdx++)
+        {
+            using var batch = new RecordBatch.Builder()
+                .Append(
+                    "x",
+                    nullable: false,
+                    array: new 
Int32Array.Builder().AppendRange(Enumerable.Range(batchIdx, 10)).Build())
+                .Build();
+
+            var expectedMetadata = $"{batchIdx}";
+            var writeMetadata = batchIdx % 2 == 0;
+            if (writeMetadata)
+            {
+                await writer.WriteAsync(batch, 
ByteString.CopyFromUtf8(expectedMetadata));
+            }
+            else
+            {
+                await writer.WriteAsync(batch);
+            }
+
+            if (!await reader.MoveNext(CancellationToken.None))
+            {
+                throw new Exception("Unexpected end of read stream");
+            }
+
+            var readMetadata = 
reader.ApplicationMetadata?.FirstOrDefault()?.ToStringUtf8();
+
+            if (writeMetadata && readMetadata != expectedMetadata)
+            {
+                throw new Exception($"Expected metadata '{expectedMetadata}' 
but received '{readMetadata}'");
+            }
+            if (!writeMetadata && readMetadata != null)
+            {
+                throw new Exception($"Unexpected metadata received: 
'{readMetadata}'");
+            }
+        }
+
+        await writer.CompleteAsync();
+
+        if (await reader.MoveNext(CancellationToken.None))
+        {
+            throw new Exception("Expected end of read stream");
+        }
+    }
+}
diff --git 
a/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/JsonTestScenario.cs
similarity index 91%
rename from csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs
rename to 
csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/JsonTestScenario.cs
index f4f3ac28bf..4f7fed7435 100644
--- a/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs
+++ 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/JsonTestScenario.cs
@@ -18,6 +18,7 @@ using System.IO;
 using System.Linq;
 using System.Threading.Tasks;
 using Apache.Arrow.Flight.Client;
+using Apache.Arrow.Flight.Server;
 using Apache.Arrow.IntegrationTest;
 using Apache.Arrow.Tests;
 using Apache.Arrow.Types;
@@ -27,20 +28,23 @@ using Grpc.Core;
 using Grpc.Net.Client.Balancer;
 using Microsoft.Extensions.DependencyInjection;
 
-namespace Apache.Arrow.Flight.IntegrationTest;
+namespace Apache.Arrow.Flight.IntegrationTest.Scenarios;
 
 /// <summary>
 /// A test scenario defined using a JSON data file
 /// </summary>
-internal class JsonTestScenario
+internal class JsonTestScenario : IScenario
 {
-    private readonly int _serverPort;
     private readonly FileInfo _jsonFile;
     private readonly ServiceProvider _serviceProvider;
 
-    public JsonTestScenario(int serverPort, FileInfo jsonFile)
+    public JsonTestScenario(FileInfo jsonFile)
     {
-        _serverPort = serverPort;
+        if (!(jsonFile?.Exists ?? false))
+        {
+            throw new Exception($"Invalid JSON file path 
'{jsonFile?.FullName}'");
+        }
+
         _jsonFile = jsonFile;
 
         var services = new ServiceCollection();
@@ -48,9 +52,14 @@ internal class JsonTestScenario
         _serviceProvider = services.BuildServiceProvider();
     }
 
-    public async Task RunClient()
+    public FlightServer MakeServer()
+    {
+        throw new NotImplementedException();
+    }
+
+    public async Task RunClient(int serverPort)
     {
-        var address = $"grpc+tcp://localhost:{_serverPort}";
+        var address = $"grpc+tcp://localhost:{serverPort}";
         using var channel = GrpcChannel.ForAddress(
             address,
             new GrpcChannelOptions
diff --git a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs 
b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Startup.cs
similarity index 66%
copy from csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs
copy to csharp/test/Apache.Arrow.Flight.IntegrationTest/Startup.cs
index d1cfe9e445..7e29d1997e 100644
--- a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Startup.cs
@@ -16,30 +16,12 @@
 using Microsoft.AspNetCore.Builder;
 using Microsoft.AspNetCore.Hosting;
 using Microsoft.AspNetCore.Http;
-using Microsoft.Extensions.DependencyInjection;
 using Microsoft.Extensions.Hosting;
-using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Logging.Console;
 
-namespace Apache.Arrow.Flight.TestWeb
+namespace Apache.Arrow.Flight.IntegrationTest
 {
     public class Startup
     {
-        // This method gets called by the runtime. Use this method to add 
services to the container.
-        // For more information on how to configure your application, visit 
https://go.microsoft.com/fwlink/?LinkID=398940
-        public void ConfigureServices(IServiceCollection services)
-        {
-            services.AddGrpc()
-                .AddFlightServer<TestFlightServer>();
-
-            services.AddSingleton(new FlightStore());
-
-            // The integration tests rely on the port being written to the 
first line of stdout,
-            // so send all logging to stderr.
-            services.Configure<ConsoleLoggerOptions>(
-                o => o.LogToStandardErrorThreshold = LogLevel.Debug);
-        }
-
         // This method gets called by the runtime. Use this method to 
configure the HTTP request pipeline.
         public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
         {
diff --git a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs 
b/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs
index d1cfe9e445..68ce378ccd 100644
--- a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs
+++ b/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs
@@ -18,8 +18,6 @@ using Microsoft.AspNetCore.Hosting;
 using Microsoft.AspNetCore.Http;
 using Microsoft.Extensions.DependencyInjection;
 using Microsoft.Extensions.Hosting;
-using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Logging.Console;
 
 namespace Apache.Arrow.Flight.TestWeb
 {
@@ -33,11 +31,6 @@ namespace Apache.Arrow.Flight.TestWeb
                 .AddFlightServer<TestFlightServer>();
 
             services.AddSingleton(new FlightStore());
-
-            // The integration tests rely on the port being written to the 
first line of stdout,
-            // so send all logging to stderr.
-            services.Configure<ConsoleLoggerOptions>(
-                o => o.LogToStandardErrorThreshold = LogLevel.Debug);
         }
 
         // This method gets called by the runtime. Use this method to 
configure the HTTP request pipeline.
diff --git a/csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs 
b/csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs
index 0e82673d02..350762c992 100644
--- a/csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs
+++ b/csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs
@@ -24,7 +24,6 @@ using Apache.Arrow.Tests;
 using Google.Protobuf;
 using Grpc.Core;
 using Grpc.Core.Utils;
-using Python.Runtime;
 using Xunit;
 
 namespace Apache.Arrow.Flight.Tests
diff --git a/dev/archery/archery/integration/runner.py 
b/dev/archery/archery/integration/runner.py
index 378b17d75f..781b41090d 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -669,6 +669,12 @@ def run_all_tests(with_cpp=True, with_java=True, 
with_js=True,
                          "RenewFlightEndpoint are working as expected."),
             skip_testers={"JS", "C#", "Rust"},
         ),
+        Scenario(
+            "do_exchange:echo",
+            description=("Test the do_exchange method by "
+                         "echoing data back to the client."),
+            skip_testers={"Go", "Java", "JS", "Rust"},
+        ),
         Scenario(
             "location:reuse_connection",
             description="Ensure arrow-flight-reuse-connection is accepted.",
diff --git a/docs/source/status.rst b/docs/source/status.rst
index c838604fca..5ab35f7639 100644
--- a/docs/source/status.rst
+++ b/docs/source/status.rst
@@ -208,15 +208,15 @@ Supported features in the gRPC transport:
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
 | Flight RPC Feature                         | C++   | Java  | Go    | JS | C# 
   | Rust  | Julia | Swift |
 
+============================================+=======+=======+=======+====+=======+=======+=======+=======+
-| All RPC methods                            | ✓     | ✓     | ✓     |    | ✓ 
(1) | ✓     |       |       |
+| All RPC methods                            | ✓     | ✓     | ✓     |    | ✓  
   | ✓     |       |       |
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
-| Authentication handlers                    | ✓     | ✓     | ✓     |    | ✓ 
(2) | ✓     |       |       |
+| Authentication handlers                    | ✓     | ✓     | ✓     |    | ✓ 
(1) | ✓     |       |       |
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
 | Call timeouts                              | ✓     | ✓     | ✓     |    |    
   | ✓     |       |       |
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
 | Call cancellation                          | ✓     | ✓     | ✓     |    |    
   | ✓     |       |       |
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
-| Concurrent client calls (3)                | ✓     | ✓     | ✓     |    | ✓  
   | ✓     |       |       |
+| Concurrent client calls (2)                | ✓     | ✓     | ✓     |    | ✓  
   | ✓     |       |       |
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
 | Custom middleware                          | ✓     | ✓     | ✓     |    |    
   | ✓     |       |       |
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
@@ -228,7 +228,7 @@ Supported features in the UCX transport:
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
 | Flight RPC Feature                         | C++   | Java  | Go    | JS | C# 
   | Rust  | Julia | Swift |
 
+============================================+=======+=======+=======+====+=======+=======+=======+=======+
-| All RPC methods                            | ✓ (4) |       |       |    |    
   |       |       |       |
+| All RPC methods                            | ✓ (3) |       |       |    |    
   |       |       |       |
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
 | Authentication handlers                    |       |       |       |    |    
   |       |       |       |
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
@@ -236,7 +236,7 @@ Supported features in the UCX transport:
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
 | Call cancellation                          |       |       |       |    |    
   |       |       |       |
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
-| Concurrent client calls                    | ✓ (5) |       |       |    |    
   |       |       |       |
+| Concurrent client calls                    | ✓ (4) |       |       |    |    
   |       |       |       |
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
 | Custom middleware                          |       |       |       |    |    
   |       |       |       |
 
+--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+
@@ -245,11 +245,10 @@ Supported features in the UCX transport:
 
 Notes:
 
-* \(1) No support for Handshake or DoExchange.
-* \(2) Support using AspNetCore authentication handlers.
-* \(3) Whether a single client can support multiple concurrent calls.
-* \(4) Only support for DoExchange, DoGet, DoPut, and GetFlightInfo.
-* \(5) Each concurrent call is a separate connection to the server
+* \(1) Support using AspNetCore authentication handlers.
+* \(2) Whether a single client can support multiple concurrent calls.
+* \(3) Only support for DoExchange, DoGet, DoPut, and GetFlightInfo.
+* \(4) Each concurrent call is a separate connection to the server
   (unlike gRPC where concurrent calls are multiplexed over a single
   connection). This will generally provide better throughput but
   consumes more resources both on the server and the client.

Reply via email to