This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new bc52f9f0e5 ARROW-17254: [C++][Go][Java][FlightRPC] Implement and test 
Flight SQL GetSchema (#13898)
bc52f9f0e5 is described below

commit bc52f9f0e582474501e92e6a281f0110754a8af1
Author: David Li <[email protected]>
AuthorDate: Thu Aug 18 12:04:42 2022 -0400

    ARROW-17254: [C++][Go][Java][FlightRPC] Implement and test Flight SQL 
GetSchema (#13898)
    
    Consistently implements and tests the GetSchema method in Flight SQL.
    
    Builds on #13897.
    
    Authored-by: David Li <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 .../arrow/flight/integration_tests/CMakeLists.txt  |  11 +
 .../integration_tests/flight_integration_test.cc   |  60 ++++
 .../flight/integration_tests/test_integration.cc   | 351 +++++++++++++--------
 cpp/src/arrow/flight/sql/client.cc                 | 208 ++++++++----
 cpp/src/arrow/flight/sql/client.h                  |  83 ++++-
 cpp/src/arrow/flight/sql/server.cc                 |  79 +++++
 cpp/src/arrow/flight/sql/server.h                  |  23 ++
 cpp/src/arrow/flight/types.cc                      |   5 +-
 cpp/src/arrow/flight/types.h                       |   2 +-
 cpp/src/arrow/python/flight.cc                     |   5 +-
 go/arrow/flight/flightsql/client.go                |  88 ++++++
 go/arrow/flight/flightsql/server.go                |  60 ++++
 go/arrow/internal/flight_integration/scenario.go   | 131 +++++++-
 .../integration/tests/FlightSqlScenario.java       |  36 ++-
 .../tests/FlightSqlScenarioProducer.java           |   9 +
 .../flight/integration/tests/IntegrationTest.java  |  65 ++++
 .../apache/arrow/flight/sql/FlightSqlClient.java   | 135 ++++++++
 .../apache/arrow/flight/sql/FlightSqlProducer.java |  45 ++-
 .../apache/arrow/flight/sql/FlightSqlUtils.java    |   2 +-
 19 files changed, 1173 insertions(+), 225 deletions(-)

diff --git a/cpp/src/arrow/flight/integration_tests/CMakeLists.txt 
b/cpp/src/arrow/flight/integration_tests/CMakeLists.txt
index 66a021b4b5..1bbd923160 100644
--- a/cpp/src/arrow/flight/integration_tests/CMakeLists.txt
+++ b/cpp/src/arrow/flight/integration_tests/CMakeLists.txt
@@ -40,3 +40,14 @@ target_link_libraries(flight-test-integration-client
 
 add_dependencies(arrow-integration flight-test-integration-client
                  flight-test-integration-server)
+
+if(ARROW_BUILD_TESTS)
+  add_arrow_test(flight_integration_test
+                 SOURCES
+                 flight_integration_test.cc
+                 test_integration.cc
+                 STATIC_LINK_LIBS
+                 ${ARROW_FLIGHT_INTEGRATION_TEST_LINK_LIBS}
+                 LABELS
+                 "arrow_flight")
+endif()
diff --git a/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc 
b/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc
new file mode 100644
index 0000000000..706ac3b7d9
--- /dev/null
+++ b/cpp/src/arrow/flight/integration_tests/flight_integration_test.cc
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Run the integration test scenarios in-process.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "arrow/flight/integration_tests/test_integration.h"
+#include "arrow/status.h"
+#include "arrow/testing/gtest_util.h"
+
+namespace arrow {
+namespace flight {
+namespace integration_tests {
+
+Status RunScenario(const std::string& scenario_name) {
+  std::shared_ptr<Scenario> scenario;
+  ARROW_RETURN_NOT_OK(GetScenario(scenario_name, &scenario));
+
+  std::unique_ptr<FlightServerBase> server;
+  ARROW_ASSIGN_OR_RAISE(Location bind_location,
+                        arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));
+  FlightServerOptions server_options(bind_location);
+  ARROW_RETURN_NOT_OK(scenario->MakeServer(&server, &server_options));
+  ARROW_RETURN_NOT_OK(server->Init(server_options));
+
+  ARROW_ASSIGN_OR_RAISE(Location location,
+                        arrow::flight::Location::ForGrpcTcp("0.0.0.0", 
server->port()));
+  auto client_options = arrow::flight::FlightClientOptions::Defaults();
+  ARROW_RETURN_NOT_OK(scenario->MakeClient(&client_options));
+  ARROW_ASSIGN_OR_RAISE(std::unique_ptr<FlightClient> client,
+                        FlightClient::Connect(location, client_options));
+  ARROW_RETURN_NOT_OK(scenario->RunClient(std::move(client)));
+  return Status::OK();
+}
+
+TEST(FlightIntegration, AuthBasicProto) { 
ASSERT_OK(RunScenario("auth:basic_proto")); }
+
+TEST(FlightIntegration, Middleware) { ASSERT_OK(RunScenario("middleware")); }
+
+TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); }
+
+}  // namespace integration_tests
+}  // namespace flight
+}  // namespace arrow
diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc 
b/cpp/src/arrow/flight/integration_tests/test_integration.cc
index 7bdd27da79..b228f9cceb 100644
--- a/cpp/src/arrow/flight/integration_tests/test_integration.cc
+++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc
@@ -117,7 +117,7 @@ class AuthBasicProtoScenario : public Scenario {
 /// regardless of what gRPC does.
 class TestServerMiddleware : public ServerMiddleware {
  public:
-  explicit TestServerMiddleware(std::string received) : received_(received) {}
+  explicit TestServerMiddleware(std::string received) : 
received_(std::move(received)) {}
 
   void SendingHeaders(AddCallHeaders* outgoing_headers) override {
     outgoing_headers->AddHeader("x-middleware", received_);
@@ -154,11 +154,11 @@ class TestClientMiddleware : public ClientMiddleware {
   explicit TestClientMiddleware(std::string* received_header)
       : received_header_(received_header) {}
 
-  void SendingHeaders(AddCallHeaders* outgoing_headers) {
+  void SendingHeaders(AddCallHeaders* outgoing_headers) override {
     outgoing_headers->AddHeader("x-middleware", "expected value");
   }
 
-  void ReceivedHeaders(const CallHeaders& incoming_headers) {
+  void ReceivedHeaders(const CallHeaders& incoming_headers) override {
     // We expect the server to always send this header. gRPC/Java may
     // send it in trailers instead of headers, so we expect Flight to
     // account for this.
@@ -170,7 +170,7 @@ class TestClientMiddleware : public ClientMiddleware {
     }
   }
 
-  void CallCompleted(const Status& status) {}
+  void CallCompleted(const Status& status) override {}
 
  private:
   std::string* received_header_;
@@ -178,7 +178,8 @@ class TestClientMiddleware : public ClientMiddleware {
 
 class TestClientMiddlewareFactory : public ClientMiddlewareFactory {
  public:
-  void StartCall(const CallInfo& info, std::unique_ptr<ClientMiddleware>* 
middleware) {
+  void StartCall(const CallInfo& info,
+                 std::unique_ptr<ClientMiddleware>* middleware) override {
     *middleware =
         std::unique_ptr<ClientMiddleware>(new 
TestClientMiddleware(&received_header_));
   }
@@ -218,8 +219,8 @@ class MiddlewareServer : public FlightServerBase {
 class MiddlewareScenario : public Scenario {
   Status MakeServer(std::unique_ptr<FlightServerBase>* server,
                     FlightServerOptions* options) override {
-    options->middleware.push_back(
-        {"grpc_trailers", std::make_shared<TestServerMiddlewareFactory>()});
+    options->middleware.emplace_back("grpc_trailers",
+                                     
std::make_shared<TestServerMiddlewareFactory>());
     server->reset(new MiddlewareServer());
     return Status::OK();
   }
@@ -284,11 +285,13 @@ std::shared_ptr<Schema> GetQuerySchema() {
 
 constexpr int64_t kUpdateStatementExpectedRows = 10000L;
 constexpr int64_t kUpdatePreparedStatementExpectedRows = 20000L;
+constexpr char kSelectStatement[] = "SELECT STATEMENT";
 
 template <typename T>
-arrow::Status AssertEq(const T& expected, const T& actual) {
+arrow::Status AssertEq(const T& expected, const T& actual, const std::string& 
message) {
   if (expected != actual) {
-    return Status::Invalid("Expected \"", expected, "\", got \'", actual, 
"\"");
+    return Status::Invalid(message, ": expected \"", expected, "\", got \"", 
actual,
+                           "\"");
   }
   return Status::OK();
 }
@@ -301,7 +304,9 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
   arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoStatement(
       const ServerCallContext& context, const sql::StatementQuery& command,
       const FlightDescriptor& descriptor) override {
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT STATEMENT", 
command.query));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>(kSelectStatement, command.query,
+                              "Unexpected statement in 
GetFlightInfoStatement"));
 
     ARROW_ASSIGN_OR_RAISE(auto handle,
                           sql::CreateStatementQueryTicket("SELECT STATEMENT 
HANDLE"));
@@ -313,6 +318,14 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
     return std::unique_ptr<FlightInfo>(new FlightInfo(result));
   }
 
+  arrow::Result<std::unique_ptr<SchemaResult>> GetSchemaStatement(
+      const ServerCallContext& context, const sql::StatementQuery& command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>(
+        kSelectStatement, command.query, "Unexpected statement in 
GetSchemaStatement"));
+    return SchemaResult::Make(*GetQuerySchema());
+  }
+
   arrow::Result<std::unique_ptr<FlightDataStream>> DoGetStatement(
       const ServerCallContext& context,
       const sql::StatementQueryTicket& command) override {
@@ -323,11 +336,21 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
       const ServerCallContext& context, const sql::PreparedStatementQuery& 
command,
       const FlightDescriptor& descriptor) override {
     ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT PREPARED STATEMENT 
HANDLE",
-                                              
command.prepared_statement_handle));
+                                              
command.prepared_statement_handle,
+                                              "Unexpected prepared statement 
handle"));
 
     return GetFlightInfoForCommand(descriptor, GetQuerySchema());
   }
 
+  arrow::Result<std::unique_ptr<SchemaResult>> GetSchemaPreparedStatement(
+      const ServerCallContext& context, const sql::PreparedStatementQuery& 
command,
+      const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT PREPARED STATEMENT 
HANDLE",
+                                              
command.prepared_statement_handle,
+                                              "Unexpected prepared statement 
handle"));
+    return SchemaResult::Make(*GetQuerySchema());
+  }
+
   arrow::Result<std::unique_ptr<FlightDataStream>> DoGetPreparedStatement(
       const ServerCallContext& context,
       const sql::PreparedStatementQuery& command) override {
@@ -358,11 +381,14 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
   arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoSqlInfo(
       const ServerCallContext& context, const sql::GetSqlInfo& command,
       const FlightDescriptor& descriptor) override {
-    ARROW_RETURN_NOT_OK(AssertEq<int64_t>(2, command.info.size()));
-    ARROW_RETURN_NOT_OK(AssertEq<int32_t>(
-        sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_NAME, 
command.info[0]));
-    ARROW_RETURN_NOT_OK(AssertEq<int32_t>(
-        sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_READ_ONLY, 
command.info[1]));
+    ARROW_RETURN_NOT_OK(AssertEq<int64_t>(2, command.info.size(),
+                                          "Wrong number of SqlInfo values 
passed"));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<int32_t>(sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_NAME,
+                          command.info[0], "Unexpected SqlInfo passed"));
+    ARROW_RETURN_NOT_OK(
+        
AssertEq<int32_t>(sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_READ_ONLY,
+                          command.info[1], "Unexpected SqlInfo passed"));
 
     return GetFlightInfoForCommand(descriptor, 
sql::SqlSchema::GetSqlInfoSchema());
   }
@@ -375,9 +401,11 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
   arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoSchemas(
       const ServerCallContext& context, const sql::GetDbSchemas& command,
       const FlightDescriptor& descriptor) override {
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", 
command.catalog.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", 
command.catalog.value(),
+                                              "Wrong catalog passed"));
     ARROW_RETURN_NOT_OK(AssertEq<std::string>("db_schema_filter_pattern",
-                                              
command.db_schema_filter_pattern.value()));
+                                              
command.db_schema_filter_pattern.value(),
+                                              "Wrong db_schema_filter_pattern 
passed"));
 
     return GetFlightInfoForCommand(descriptor, 
sql::SqlSchema::GetDbSchemasSchema());
   }
@@ -390,15 +418,22 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
   arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoTables(
       const ServerCallContext& context, const sql::GetTables& command,
       const FlightDescriptor& descriptor) override {
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", 
command.catalog.value()));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("catalog", 
command.catalog.value(),
+                                              "Wrong catalog passed"));
     ARROW_RETURN_NOT_OK(AssertEq<std::string>("db_schema_filter_pattern",
-                                              
command.db_schema_filter_pattern.value()));
+                                              
command.db_schema_filter_pattern.value(),
+                                              "Wrong db_schema_filter_pattern 
passed"));
     ARROW_RETURN_NOT_OK(AssertEq<std::string>("table_filter_pattern",
-                                              
command.table_name_filter_pattern.value()));
-    ARROW_RETURN_NOT_OK(AssertEq<int64_t>(2, command.table_types.size()));
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", 
command.table_types[0]));
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("view", command.table_types[1]));
-    ARROW_RETURN_NOT_OK(AssertEq<bool>(true, command.include_schema));
+                                              
command.table_name_filter_pattern.value(),
+                                              "Wrong table_filter_pattern 
passed"));
+    ARROW_RETURN_NOT_OK(AssertEq<int64_t>(2, command.table_types.size(),
+                                          "Wrong number of table types 
passed"));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", command.table_types[0],
+                                              "Wrong table type passed"));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("view", command.table_types[1], "Wrong table 
type passed"));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<bool>(true, command.include_schema, "include_schema should be 
true"));
 
     return GetFlightInfoForCommand(descriptor,
                                    
sql::SqlSchema::GetTablesSchemaWithIncludedSchema());
@@ -422,11 +457,12 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
   arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoPrimaryKeys(
       const ServerCallContext& context, const sql::GetPrimaryKeys& command,
       const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>(
+        "catalog", command.table_ref.catalog.value(), "Wrong catalog passed"));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>(
+        "db_schema", command.table_ref.db_schema.value(), "Wrong db_schema 
passed"));
     ARROW_RETURN_NOT_OK(
-        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
-    ARROW_RETURN_NOT_OK(
-        AssertEq<std::string>("db_schema", 
command.table_ref.db_schema.value()));
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", 
command.table_ref.table));
+        AssertEq<std::string>("table", command.table_ref.table, "Wrong table 
passed"));
 
     return GetFlightInfoForCommand(descriptor, 
sql::SqlSchema::GetPrimaryKeysSchema());
   }
@@ -439,11 +475,12 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
   arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoExportedKeys(
       const ServerCallContext& context, const sql::GetExportedKeys& command,
       const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>(
+        "catalog", command.table_ref.catalog.value(), "Wrong catalog passed"));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>(
+        "db_schema", command.table_ref.db_schema.value(), "Wrong db_schema 
passed"));
     ARROW_RETURN_NOT_OK(
-        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
-    ARROW_RETURN_NOT_OK(
-        AssertEq<std::string>("db_schema", 
command.table_ref.db_schema.value()));
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", 
command.table_ref.table));
+        AssertEq<std::string>("table", command.table_ref.table, "Wrong table 
passed"));
 
     return GetFlightInfoForCommand(descriptor, 
sql::SqlSchema::GetExportedKeysSchema());
   }
@@ -456,11 +493,12 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
   arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoImportedKeys(
       const ServerCallContext& context, const sql::GetImportedKeys& command,
       const FlightDescriptor& descriptor) override {
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>(
+        "catalog", command.table_ref.catalog.value(), "Wrong catalog passed"));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>(
+        "db_schema", command.table_ref.db_schema.value(), "Wrong db_schema 
passed"));
     ARROW_RETURN_NOT_OK(
-        AssertEq<std::string>("catalog", command.table_ref.catalog.value()));
-    ARROW_RETURN_NOT_OK(
-        AssertEq<std::string>("db_schema", 
command.table_ref.db_schema.value()));
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("table", 
command.table_ref.table));
+        AssertEq<std::string>("table", command.table_ref.table, "Wrong table 
passed"));
 
     return GetFlightInfoForCommand(descriptor, 
sql::SqlSchema::GetImportedKeysSchema());
   }
@@ -473,16 +511,20 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
   arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoCrossReference(
       const ServerCallContext& context, const sql::GetCrossReference& command,
       const FlightDescriptor& descriptor) override {
-    ARROW_RETURN_NOT_OK(
-        AssertEq<std::string>("pk_catalog", 
command.pk_table_ref.catalog.value()));
-    ARROW_RETURN_NOT_OK(
-        AssertEq<std::string>("pk_db_schema", 
command.pk_table_ref.db_schema.value()));
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("pk_table", 
command.pk_table_ref.table));
-    ARROW_RETURN_NOT_OK(
-        AssertEq<std::string>("fk_catalog", 
command.fk_table_ref.catalog.value()));
-    ARROW_RETURN_NOT_OK(
-        AssertEq<std::string>("fk_db_schema", 
command.fk_table_ref.db_schema.value()));
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("fk_table", 
command.fk_table_ref.table));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>(
+        "pk_catalog", command.pk_table_ref.catalog.value(), "Wrong pk catalog 
passed"));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("pk_db_schema",
+                                              
command.pk_table_ref.db_schema.value(),
+                                              "Wrong pk db_schema passed"));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("pk_table", 
command.pk_table_ref.table,
+                                              "Wrong pk table passed"));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>(
+        "fk_catalog", command.fk_table_ref.catalog.value(), "Wrong fk catalog 
passed"));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("fk_db_schema",
+                                              
command.fk_table_ref.db_schema.value(),
+                                              "Wrong fk db_schema passed"));
+    ARROW_RETURN_NOT_OK(AssertEq<std::string>("fk_table", 
command.fk_table_ref.table,
+                                              "Wrong fk table passed"));
 
     return GetFlightInfoForCommand(descriptor, 
sql::SqlSchema::GetTableTypesSchema());
   }
@@ -494,7 +536,9 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
 
   arrow::Result<int64_t> DoPutCommandStatementUpdate(
       const ServerCallContext& context, const sql::StatementUpdate& command) 
override {
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("UPDATE STATEMENT", 
command.query));
+    ARROW_RETURN_NOT_OK(
+        AssertEq<std::string>("UPDATE STATEMENT", command.query,
+                              "Wrong query for DoPutCommandStatementUpdate"));
 
     return kUpdateStatementExpectedRows;
   }
@@ -502,9 +546,10 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
   arrow::Result<sql::ActionCreatePreparedStatementResult> 
CreatePreparedStatement(
       const ServerCallContext& context,
       const sql::ActionCreatePreparedStatementRequest& request) override {
-    ARROW_RETURN_NOT_OK(
-        AssertEq<bool>(true, request.query == "SELECT PREPARED STATEMENT" ||
-                                 request.query == "UPDATE PREPARED 
STATEMENT"));
+    if (request.query != "SELECT PREPARED STATEMENT" &&
+        request.query != "UPDATE PREPARED STATEMENT") {
+      return Status::Invalid("Unexpected query: ", request.query);
+    }
 
     sql::ActionCreatePreparedStatementResult result;
     result.prepared_statement_handle = request.query + " HANDLE";
@@ -515,6 +560,11 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
   Status ClosePreparedStatement(
       const ServerCallContext& context,
       const sql::ActionClosePreparedStatementRequest& request) override {
+    if (request.prepared_statement_handle != "SELECT PREPARED STATEMENT 
HANDLE" &&
+        request.prepared_statement_handle != "UPDATE PREPARED STATEMENT 
HANDLE") {
+      return Status::Invalid("Invalid handle for ClosePreparedStatement: ",
+                             request.prepared_statement_handle);
+    }
     return Status::OK();
   }
 
@@ -522,11 +572,14 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
                                      const sql::PreparedStatementQuery& 
command,
                                      FlightMessageReader* reader,
                                      FlightMetadataWriter* writer) override {
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("SELECT PREPARED STATEMENT 
HANDLE",
-                                              
command.prepared_statement_handle));
+    if (command.prepared_statement_handle != "SELECT PREPARED STATEMENT 
HANDLE") {
+      return Status::Invalid("Invalid handle for DoPutPreparedStatementQuery: 
",
+                             command.prepared_statement_handle);
+    }
 
     ARROW_ASSIGN_OR_RAISE(auto actual_schema, reader->GetSchema());
-    ARROW_RETURN_NOT_OK(AssertEq<Schema>(*GetQuerySchema(), *actual_schema));
+    ARROW_RETURN_NOT_OK(AssertEq<Schema>(*GetQuerySchema(), *actual_schema,
+                                         "Wrong schema for 
DoPutPreparedStatementQuery"));
 
     return Status::OK();
   }
@@ -534,10 +587,11 @@ class FlightSqlScenarioServer : public 
sql::FlightSqlServerBase {
   arrow::Result<int64_t> DoPutPreparedStatementUpdate(
       const ServerCallContext& context, const sql::PreparedStatementUpdate& 
command,
       FlightMessageReader* reader) override {
-    ARROW_RETURN_NOT_OK(AssertEq<std::string>("UPDATE PREPARED STATEMENT 
HANDLE",
-                                              
command.prepared_statement_handle));
-
-    return kUpdatePreparedStatementExpectedRows;
+    if (command.prepared_statement_handle == "UPDATE PREPARED STATEMENT 
HANDLE") {
+      return kUpdatePreparedStatementExpectedRows;
+    }
+    return Status::Invalid("Invalid handle for DoPutPreparedStatementUpdate: ",
+                           command.prepared_statement_handle);
   }
 
  private:
@@ -569,19 +623,27 @@ class FlightSqlScenario : public Scenario {
 
   Status MakeClient(FlightClientOptions* options) override { return 
Status::OK(); }
 
-  Status Validate(std::shared_ptr<Schema> expected_schema,
-                  arrow::Result<std::unique_ptr<FlightInfo>> 
flight_info_result,
-                  sql::FlightSqlClient* sql_client) {
+  Status Validate(const std::shared_ptr<Schema>& expected_schema,
+                  const FlightInfo& flight_info, sql::FlightSqlClient* 
sql_client) {
     FlightCallOptions call_options;
-
-    ARROW_ASSIGN_OR_RAISE(auto flight_info, flight_info_result);
     ARROW_ASSIGN_OR_RAISE(
-        auto reader, sql_client->DoGet(call_options, 
flight_info->endpoints()[0].ticket));
-
+        std::unique_ptr<FlightStreamReader> reader,
+        sql_client->DoGet(call_options, flight_info.endpoints()[0].ticket));
     ARROW_ASSIGN_OR_RAISE(auto actual_schema, reader->GetSchema());
+    if (!expected_schema->Equals(*actual_schema, /*check_metadata=*/true)) {
+      return Status::Invalid("Schemas did not match. Expected:\n", 
*expected_schema,
+                             "\nActual:\n", *actual_schema);
+    }
+    ARROW_RETURN_NOT_OK(reader->ToTable());
+    return Status::OK();
+  }
 
-    if (!actual_schema->Equals(*expected_schema, /*check_metadata=*/true)) {
-      return Status::Invalid("Schemas do not match. Expected:\n", 
*expected_schema,
+  Status ValidateSchema(const std::shared_ptr<Schema>& expected_schema,
+                        const SchemaResult& result) {
+    ipc::DictionaryMemo memo;
+    ARROW_ASSIGN_OR_RAISE(auto actual_schema, result.GetSchema(&memo));
+    if (!expected_schema->Equals(*actual_schema, /*check_metadata=*/true)) {
+      return Status::Invalid("Schemas did not match. Expected:\n", 
*expected_schema,
                              "\nActual:\n", *actual_schema);
     }
     return Status::OK();
@@ -589,13 +651,9 @@ class FlightSqlScenario : public Scenario {
 
   Status RunClient(std::unique_ptr<FlightClient> client) override {
     sql::FlightSqlClient sql_client(std::move(client));
-
     ARROW_RETURN_NOT_OK(ValidateMetadataRetrieval(&sql_client));
-
     ARROW_RETURN_NOT_OK(ValidateStatementExecution(&sql_client));
-
     ARROW_RETURN_NOT_OK(ValidatePreparedStatementExecution(&sql_client));
-
     return Status::OK();
   }
 
@@ -613,82 +671,119 @@ class FlightSqlScenario : public Scenario {
     sql::TableRef pk_table_ref = {"pk_catalog", "pk_db_schema", "pk_table"};
     sql::TableRef fk_table_ref = {"fk_catalog", "fk_db_schema", "fk_table"};
 
-    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetCatalogsSchema(),
-                                 sql_client->GetCatalogs(options), 
sql_client));
+    std::unique_ptr<FlightInfo> info;
+    std::unique_ptr<SchemaResult> schema;
+
+    ARROW_ASSIGN_OR_RAISE(info, sql_client->GetCatalogs(options));
+    ARROW_ASSIGN_OR_RAISE(schema, sql_client->GetCatalogsSchema(options));
+    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetCatalogsSchema(), *info, 
sql_client));
+    ARROW_RETURN_NOT_OK(ValidateSchema(sql::SqlSchema::GetCatalogsSchema(), 
*schema));
+
+    ARROW_ASSIGN_OR_RAISE(
+        info, sql_client->GetDbSchemas(options, &catalog, 
&db_schema_filter_pattern));
+    ARROW_ASSIGN_OR_RAISE(schema, sql_client->GetDbSchemasSchema(options));
+    ARROW_RETURN_NOT_OK(
+        Validate(sql::SqlSchema::GetDbSchemasSchema(), *info, sql_client));
+    ARROW_RETURN_NOT_OK(ValidateSchema(sql::SqlSchema::GetDbSchemasSchema(), 
*schema));
+
+    ARROW_ASSIGN_OR_RAISE(
+        info, sql_client->GetTables(options, &catalog, 
&db_schema_filter_pattern,
+                                    &table_filter_pattern, true, 
&table_types));
+    ARROW_ASSIGN_OR_RAISE(schema,
+                          sql_client->GetTablesSchema(options, 
/*include_schema=*/true));
+    ARROW_RETURN_NOT_OK(
+        Validate(sql::SqlSchema::GetTablesSchemaWithIncludedSchema(), *info, 
sql_client));
+    ARROW_RETURN_NOT_OK(
+        ValidateSchema(sql::SqlSchema::GetTablesSchemaWithIncludedSchema(), 
*schema));
+
+    ARROW_ASSIGN_OR_RAISE(schema,
+                          sql_client->GetTablesSchema(options, 
/*include_schema=*/false));
+    ARROW_RETURN_NOT_OK(ValidateSchema(sql::SqlSchema::GetTablesSchema(), 
*schema));
+
+    ARROW_ASSIGN_OR_RAISE(info, sql_client->GetTableTypes(options));
+    ARROW_ASSIGN_OR_RAISE(schema, sql_client->GetTableTypesSchema(options));
+    ARROW_RETURN_NOT_OK(
+        Validate(sql::SqlSchema::GetTableTypesSchema(), *info, sql_client));
+    ARROW_RETURN_NOT_OK(ValidateSchema(sql::SqlSchema::GetTableTypesSchema(), 
*schema));
+
+    ARROW_ASSIGN_OR_RAISE(info, sql_client->GetPrimaryKeys(options, 
table_ref));
+    ARROW_ASSIGN_OR_RAISE(schema, sql_client->GetPrimaryKeysSchema(options));
+    ARROW_RETURN_NOT_OK(
+        Validate(sql::SqlSchema::GetPrimaryKeysSchema(), *info, sql_client));
+    ARROW_RETURN_NOT_OK(ValidateSchema(sql::SqlSchema::GetPrimaryKeysSchema(), 
*schema));
+
+    ARROW_ASSIGN_OR_RAISE(info, sql_client->GetExportedKeys(options, 
table_ref));
+    ARROW_ASSIGN_OR_RAISE(schema, sql_client->GetExportedKeysSchema(options));
+    ARROW_RETURN_NOT_OK(
+        Validate(sql::SqlSchema::GetExportedKeysSchema(), *info, sql_client));
+    
ARROW_RETURN_NOT_OK(ValidateSchema(sql::SqlSchema::GetExportedKeysSchema(), 
*schema));
+
+    ARROW_ASSIGN_OR_RAISE(info, sql_client->GetImportedKeys(options, 
table_ref));
+    ARROW_ASSIGN_OR_RAISE(schema, sql_client->GetImportedKeysSchema(options));
+    ARROW_RETURN_NOT_OK(
+        Validate(sql::SqlSchema::GetImportedKeysSchema(), *info, sql_client));
+    
ARROW_RETURN_NOT_OK(ValidateSchema(sql::SqlSchema::GetImportedKeysSchema(), 
*schema));
+
+    ARROW_ASSIGN_OR_RAISE(
+        info, sql_client->GetCrossReference(options, pk_table_ref, 
fk_table_ref));
+    ARROW_ASSIGN_OR_RAISE(schema, 
sql_client->GetCrossReferenceSchema(options));
     ARROW_RETURN_NOT_OK(
-        Validate(sql::SqlSchema::GetDbSchemasSchema(),
-                 sql_client->GetDbSchemas(options, &catalog, 
&db_schema_filter_pattern),
-                 sql_client));
+        Validate(sql::SqlSchema::GetCrossReferenceSchema(), *info, 
sql_client));
     ARROW_RETURN_NOT_OK(
-        Validate(sql::SqlSchema::GetTablesSchemaWithIncludedSchema(),
-                 sql_client->GetTables(options, &catalog, 
&db_schema_filter_pattern,
-                                       &table_filter_pattern, true, 
&table_types),
-                 sql_client));
-    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetTableTypesSchema(),
-                                 sql_client->GetTableTypes(options), 
sql_client));
-    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetPrimaryKeysSchema(),
-                                 sql_client->GetPrimaryKeys(options, 
table_ref),
-                                 sql_client));
-    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetExportedKeysSchema(),
-                                 sql_client->GetExportedKeys(options, 
table_ref),
-                                 sql_client));
-    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetImportedKeysSchema(),
-                                 sql_client->GetImportedKeys(options, 
table_ref),
-                                 sql_client));
-    ARROW_RETURN_NOT_OK(Validate(
-        sql::SqlSchema::GetCrossReferenceSchema(),
-        sql_client->GetCrossReference(options, pk_table_ref, fk_table_ref), 
sql_client));
-    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetXdbcTypeInfoSchema(),
-                                 sql_client->GetXdbcTypeInfo(options), 
sql_client));
-    ARROW_RETURN_NOT_OK(Validate(
-        sql::SqlSchema::GetSqlInfoSchema(),
-        sql_client->GetSqlInfo(
-            options, {sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_NAME,
-                      
sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_READ_ONLY}),
-        sql_client));
+        ValidateSchema(sql::SqlSchema::GetCrossReferenceSchema(), *schema));
+
+    ARROW_ASSIGN_OR_RAISE(info, sql_client->GetXdbcTypeInfo(options));
+    ARROW_ASSIGN_OR_RAISE(schema, sql_client->GetXdbcTypeInfoSchema(options));
+    ARROW_RETURN_NOT_OK(
+        Validate(sql::SqlSchema::GetXdbcTypeInfoSchema(), *info, sql_client));
+    
ARROW_RETURN_NOT_OK(ValidateSchema(sql::SqlSchema::GetXdbcTypeInfoSchema(), 
*schema));
+
+    ARROW_ASSIGN_OR_RAISE(
+        info, sql_client->GetSqlInfo(
+                  options, 
{sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_NAME,
+                            
sql::SqlInfoOptions::SqlInfo::FLIGHT_SQL_SERVER_READ_ONLY}));
+    ARROW_ASSIGN_OR_RAISE(schema, sql_client->GetSqlInfoSchema(options));
+    ARROW_RETURN_NOT_OK(Validate(sql::SqlSchema::GetSqlInfoSchema(), *info, 
sql_client));
+    ARROW_RETURN_NOT_OK(ValidateSchema(sql::SqlSchema::GetSqlInfoSchema(), 
*schema));
 
     return Status::OK();
   }
 
   Status ValidateStatementExecution(sql::FlightSqlClient* sql_client) {
-    FlightCallOptions options;
+    ARROW_ASSIGN_OR_RAISE(auto info, sql_client->Execute({}, 
kSelectStatement));
+    ARROW_RETURN_NOT_OK(Validate(GetQuerySchema(), *info, sql_client));
 
-    ARROW_RETURN_NOT_OK(Validate(
-        GetQuerySchema(), sql_client->Execute(options, "SELECT STATEMENT"), 
sql_client));
-    ARROW_ASSIGN_OR_RAISE(auto update_statement_result,
-                          sql_client->ExecuteUpdate(options, "UPDATE 
STATEMENT"));
-    if (update_statement_result != kUpdateStatementExpectedRows) {
-      return Status::Invalid("Expected 'UPDATE STATEMENT' return ",
-                             kUpdateStatementExpectedRows, ", got ",
-                             update_statement_result);
-    }
+    ARROW_ASSIGN_OR_RAISE(auto schema,
+                          sql_client->GetExecuteSchema({}, kSelectStatement));
+    ARROW_RETURN_NOT_OK(ValidateSchema(GetQuerySchema(), *schema));
+
+    ARROW_ASSIGN_OR_RAISE(auto updated_rows,
+                          sql_client->ExecuteUpdate({}, "UPDATE STATEMENT"));
+    ARROW_RETURN_NOT_OK(AssertEq(kUpdateStatementExpectedRows, updated_rows,
+                                 "Wrong number of updated rows for 
ExecuteUpdate"));
 
     return Status::OK();
   }
 
   Status ValidatePreparedStatementExecution(sql::FlightSqlClient* sql_client) {
-    FlightCallOptions options;
-
-    ARROW_ASSIGN_OR_RAISE(auto select_prepared_statement,
-                          sql_client->Prepare(options, "SELECT PREPARED 
STATEMENT"));
-
     auto parameters =
         RecordBatch::Make(GetQuerySchema(), 1, {ArrayFromJSON(int64(), 
"[1]")});
-    ARROW_RETURN_NOT_OK(select_prepared_statement->SetParameters(parameters));
 
-    ARROW_RETURN_NOT_OK(
-        Validate(GetQuerySchema(), select_prepared_statement->Execute(), 
sql_client));
+    ARROW_ASSIGN_OR_RAISE(auto select_prepared_statement,
+                          sql_client->Prepare({}, "SELECT PREPARED 
STATEMENT"));
+    ARROW_RETURN_NOT_OK(select_prepared_statement->SetParameters(parameters));
+    ARROW_ASSIGN_OR_RAISE(auto info, select_prepared_statement->Execute());
+    ARROW_RETURN_NOT_OK(Validate(GetQuerySchema(), *info, sql_client));
+    ARROW_ASSIGN_OR_RAISE(auto schema, 
select_prepared_statement->GetSchema({}));
+    ARROW_RETURN_NOT_OK(ValidateSchema(GetQuerySchema(), *schema));
     ARROW_RETURN_NOT_OK(select_prepared_statement->Close());
 
     ARROW_ASSIGN_OR_RAISE(auto update_prepared_statement,
-                          sql_client->Prepare(options, "UPDATE PREPARED 
STATEMENT"));
-    ARROW_ASSIGN_OR_RAISE(auto update_prepared_statement_result,
-                          update_prepared_statement->ExecuteUpdate());
-    if (update_prepared_statement_result != 
kUpdatePreparedStatementExpectedRows) {
-      return Status::Invalid("Expected 'UPDATE STATEMENT' return ",
-                             kUpdatePreparedStatementExpectedRows, ", got ",
-                             update_prepared_statement_result);
-    }
+                          sql_client->Prepare({}, "UPDATE PREPARED 
STATEMENT"));
+    ARROW_ASSIGN_OR_RAISE(auto updated_rows, 
update_prepared_statement->ExecuteUpdate());
+    ARROW_RETURN_NOT_OK(
+        AssertEq(kUpdatePreparedStatementExpectedRows, updated_rows,
+                 "Wrong number of updated rows for prepared statement 
ExecuteUpdate"));
     ARROW_RETURN_NOT_OK(update_prepared_statement->Close());
 
     return Status::OK();
diff --git a/cpp/src/arrow/flight/sql/client.cc 
b/cpp/src/arrow/flight/sql/client.cc
index 10ff1eea6f..e299b7ceb1 100644
--- a/cpp/src/arrow/flight/sql/client.cc
+++ b/cpp/src/arrow/flight/sql/client.cc
@@ -36,15 +36,45 @@ namespace arrow {
 namespace flight {
 namespace sql {
 
+namespace {
+arrow::Result<FlightDescriptor> GetFlightDescriptorForCommand(
+    const google::protobuf::Message& command) {
+  google::protobuf::Any any;
+  if (!any.PackFrom(command)) {
+    return Status::SerializationError("Failed to pack ", 
command.GetTypeName());
+  }
+
+  std::string buf;
+  if (!any.SerializeToString(&buf)) {
+    return Status::SerializationError("Failed to serialize ", 
command.GetTypeName());
+  }
+  return FlightDescriptor::Command(buf);
+}
+
+arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoForCommand(
+    FlightSqlClient* client, const FlightCallOptions& options,
+    const google::protobuf::Message& command) {
+  ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
+                        GetFlightDescriptorForCommand(command));
+  return client->GetFlightInfo(options, descriptor);
+}
+
+arrow::Result<std::unique_ptr<SchemaResult>> GetSchemaForCommand(
+    FlightSqlClient* client, const FlightCallOptions& options,
+    const google::protobuf::Message& command) {
+  ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
+                        GetFlightDescriptorForCommand(command));
+  return client->GetSchema(options, descriptor);
+}
+}  // namespace
+
 FlightSqlClient::FlightSqlClient(std::shared_ptr<FlightClient> client)
     : impl_(std::move(client)) {}
 
 PreparedStatement::PreparedStatement(FlightSqlClient* client, std::string 
handle,
                                      std::shared_ptr<Schema> dataset_schema,
-                                     std::shared_ptr<Schema> parameter_schema,
-                                     FlightCallOptions options)
+                                     std::shared_ptr<Schema> parameter_schema)
     : client_(client),
-      options_(std::move(options)),
       handle_(std::move(handle)),
       dataset_schema_(std::move(dataset_schema)),
       parameter_schema_(std::move(parameter_schema)),
@@ -59,30 +89,20 @@ PreparedStatement::~PreparedStatement() {
   }
 }
 
-inline FlightDescriptor GetFlightDescriptorForCommand(
-    const google::protobuf::Message& command) {
-  google::protobuf::Any any;
-  any.PackFrom(command);
-
-  const std::string& string = any.SerializeAsString();
-  return FlightDescriptor::Command(string);
-}
-
-arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoForCommand(
-    FlightSqlClient& client, const FlightCallOptions& options,
-    const google::protobuf::Message& command) {
-  const FlightDescriptor& descriptor = GetFlightDescriptorForCommand(command);
+arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::Execute(
+    const FlightCallOptions& options, const std::string& query) {
+  flight_sql_pb::CommandStatementQuery command;
+  command.set_query(query);
 
-  ARROW_ASSIGN_OR_RAISE(auto flight_info, client.GetFlightInfo(options, 
descriptor));
-  return std::move(flight_info);
+  return GetFlightInfoForCommand(this, options, command);
 }
 
-arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::Execute(
+arrow::Result<std::unique_ptr<SchemaResult>> FlightSqlClient::GetExecuteSchema(
     const FlightCallOptions& options, const std::string& query) {
   flight_sql_pb::CommandStatementQuery command;
   command.set_query(query);
 
-  return GetFlightInfoForCommand(*this, options, command);
+  return GetSchemaForCommand(this, options, command);
 }
 
 arrow::Result<int64_t> FlightSqlClient::ExecuteUpdate(const FlightCallOptions& 
options,
@@ -90,7 +110,8 @@ arrow::Result<int64_t> FlightSqlClient::ExecuteUpdate(const 
FlightCallOptions& o
   flight_sql_pb::CommandStatementUpdate command;
   command.set_query(query);
 
-  const FlightDescriptor& descriptor = GetFlightDescriptorForCommand(command);
+  ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
+                        GetFlightDescriptorForCommand(command));
 
   std::unique_ptr<FlightStreamWriter> writer;
   std::unique_ptr<FlightMetadataReader> reader;
@@ -114,8 +135,13 @@ arrow::Result<int64_t> 
FlightSqlClient::ExecuteUpdate(const FlightCallOptions& o
 arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetCatalogs(
     const FlightCallOptions& options) {
   flight_sql_pb::CommandGetCatalogs command;
+  return GetFlightInfoForCommand(this, options, command);
+}
 
-  return GetFlightInfoForCommand(*this, options, command);
+arrow::Result<std::unique_ptr<SchemaResult>> 
FlightSqlClient::GetCatalogsSchema(
+    const FlightCallOptions& options) {
+  flight_sql_pb::CommandGetCatalogs command;
+  return GetSchemaForCommand(this, options, command);
 }
 
 arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetDbSchemas(
@@ -129,7 +155,13 @@ arrow::Result<std::unique_ptr<FlightInfo>> 
FlightSqlClient::GetDbSchemas(
     command.set_db_schema_filter_pattern(*db_schema_filter_pattern);
   }
 
-  return GetFlightInfoForCommand(*this, options, command);
+  return GetFlightInfoForCommand(this, options, command);
+}
+
+arrow::Result<std::unique_ptr<SchemaResult>> 
FlightSqlClient::GetDbSchemasSchema(
+    const FlightCallOptions& options) {
+  flight_sql_pb::CommandGetDbSchemas command;
+  return GetSchemaForCommand(this, options, command);
 }
 
 arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetTables(
@@ -158,7 +190,14 @@ arrow::Result<std::unique_ptr<FlightInfo>> 
FlightSqlClient::GetTables(
     }
   }
 
-  return GetFlightInfoForCommand(*this, options, command);
+  return GetFlightInfoForCommand(this, options, command);
+}
+
+arrow::Result<std::unique_ptr<SchemaResult>> FlightSqlClient::GetTablesSchema(
+    const FlightCallOptions& options, bool include_schema) {
+  flight_sql_pb::CommandGetTables command;
+  command.set_include_schema(include_schema);
+  return GetSchemaForCommand(this, options, command);
 }
 
 arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetPrimaryKeys(
@@ -175,7 +214,13 @@ arrow::Result<std::unique_ptr<FlightInfo>> 
FlightSqlClient::GetPrimaryKeys(
 
   command.set_table(table_ref.table);
 
-  return GetFlightInfoForCommand(*this, options, command);
+  return GetFlightInfoForCommand(this, options, command);
+}
+
+arrow::Result<std::unique_ptr<SchemaResult>> 
FlightSqlClient::GetPrimaryKeysSchema(
+    const FlightCallOptions& options) {
+  flight_sql_pb::CommandGetPrimaryKeys command;
+  return GetSchemaForCommand(this, options, command);
 }
 
 arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetExportedKeys(
@@ -192,7 +237,13 @@ arrow::Result<std::unique_ptr<FlightInfo>> 
FlightSqlClient::GetExportedKeys(
 
   command.set_table(table_ref.table);
 
-  return GetFlightInfoForCommand(*this, options, command);
+  return GetFlightInfoForCommand(this, options, command);
+}
+
+arrow::Result<std::unique_ptr<SchemaResult>> 
FlightSqlClient::GetExportedKeysSchema(
+    const FlightCallOptions& options) {
+  flight_sql_pb::CommandGetExportedKeys command;
+  return GetSchemaForCommand(this, options, command);
 }
 
 arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetImportedKeys(
@@ -209,7 +260,13 @@ arrow::Result<std::unique_ptr<FlightInfo>> 
FlightSqlClient::GetImportedKeys(
 
   command.set_table(table_ref.table);
 
-  return GetFlightInfoForCommand(*this, options, command);
+  return GetFlightInfoForCommand(this, options, command);
+}
+
+arrow::Result<std::unique_ptr<SchemaResult>> 
FlightSqlClient::GetImportedKeysSchema(
+    const FlightCallOptions& options) {
+  flight_sql_pb::CommandGetImportedKeys command;
+  return GetSchemaForCommand(this, options, command);
 }
 
 arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetCrossReference(
@@ -233,21 +290,33 @@ arrow::Result<std::unique_ptr<FlightInfo>> 
FlightSqlClient::GetCrossReference(
   }
   command.set_fk_table(fk_table_ref.table);
 
-  return GetFlightInfoForCommand(*this, options, command);
+  return GetFlightInfoForCommand(this, options, command);
+}
+
+arrow::Result<std::unique_ptr<SchemaResult>> 
FlightSqlClient::GetCrossReferenceSchema(
+    const FlightCallOptions& options) {
+  flight_sql_pb::CommandGetCrossReference command;
+  return GetSchemaForCommand(this, options, command);
 }
 
 arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetTableTypes(
     const FlightCallOptions& options) {
   flight_sql_pb::CommandGetTableTypes command;
 
-  return GetFlightInfoForCommand(*this, options, command);
+  return GetFlightInfoForCommand(this, options, command);
+}
+
+arrow::Result<std::unique_ptr<SchemaResult>> 
FlightSqlClient::GetTableTypesSchema(
+    const FlightCallOptions& options) {
+  flight_sql_pb::CommandGetTableTypes command;
+  return GetSchemaForCommand(this, options, command);
 }
 
 arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetXdbcTypeInfo(
     const FlightCallOptions& options) {
   flight_sql_pb::CommandGetXdbcTypeInfo command;
 
-  return GetFlightInfoForCommand(*this, options, command);
+  return GetFlightInfoForCommand(this, options, command);
 }
 
 arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetXdbcTypeInfo(
@@ -256,7 +325,27 @@ arrow::Result<std::unique_ptr<FlightInfo>> 
FlightSqlClient::GetXdbcTypeInfo(
 
   command.set_data_type(data_type);
 
-  return GetFlightInfoForCommand(*this, options, command);
+  return GetFlightInfoForCommand(this, options, command);
+}
+
+arrow::Result<std::unique_ptr<SchemaResult>> 
FlightSqlClient::GetXdbcTypeInfoSchema(
+    const FlightCallOptions& options) {
+  flight_sql_pb::CommandGetXdbcTypeInfo command;
+  return GetSchemaForCommand(this, options, command);
+}
+
+arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetSqlInfo(
+    const FlightCallOptions& options, const std::vector<int>& sql_info) {
+  flight_sql_pb::CommandGetSqlInfo command;
+  for (const int& info : sql_info) command.add_info(info);
+
+  return GetFlightInfoForCommand(this, options, command);
+}
+
+arrow::Result<std::unique_ptr<SchemaResult>> FlightSqlClient::GetSqlInfoSchema(
+    const FlightCallOptions& options) {
+  flight_sql_pb::CommandGetSqlInfo command;
+  return GetSchemaForCommand(this, options, command);
 }
 
 arrow::Result<std::unique_ptr<FlightStreamReader>> FlightSqlClient::DoGet(
@@ -319,28 +408,24 @@ arrow::Result<std::shared_ptr<PreparedStatement>> 
FlightSqlClient::Prepare(
   auto handle = prepared_statement_result.prepared_statement_handle();
 
   return std::make_shared<PreparedStatement>(this, handle, dataset_schema,
-                                             parameter_schema, options);
+                                             parameter_schema);
 }
 
-arrow::Result<std::unique_ptr<FlightInfo>> PreparedStatement::Execute() {
+arrow::Result<std::unique_ptr<FlightInfo>> PreparedStatement::Execute(
+    const FlightCallOptions& options) {
   if (is_closed_) {
     return Status::Invalid("Statement already closed.");
   }
 
-  flight_sql_pb::CommandPreparedStatementQuery execute_query_command;
-
-  execute_query_command.set_prepared_statement_handle(handle_);
-
-  google::protobuf::Any any;
-  any.PackFrom(execute_query_command);
-
-  const std::string& string = any.SerializeAsString();
-  const FlightDescriptor descriptor = FlightDescriptor::Command(string);
+  flight_sql_pb::CommandPreparedStatementQuery command;
+  command.set_prepared_statement_handle(handle_);
+  ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
+                        GetFlightDescriptorForCommand(command));
 
   if (parameter_binding_ && parameter_binding_->num_rows() > 0) {
     std::unique_ptr<FlightStreamWriter> writer;
     std::unique_ptr<FlightMetadataReader> reader;
-    ARROW_RETURN_NOT_OK(client_->DoPut(options_, descriptor, 
parameter_binding_->schema(),
+    ARROW_RETURN_NOT_OK(client_->DoPut(options, descriptor, 
parameter_binding_->schema(),
                                        &writer, &reader));
 
     ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*parameter_binding_));
@@ -350,28 +435,30 @@ arrow::Result<std::unique_ptr<FlightInfo>> 
PreparedStatement::Execute() {
     ARROW_RETURN_NOT_OK(reader->ReadMetadata(&buffer));
   }
 
-  ARROW_ASSIGN_OR_RAISE(auto flight_info, client_->GetFlightInfo(options_, 
descriptor));
+  ARROW_ASSIGN_OR_RAISE(auto flight_info, client_->GetFlightInfo(options, 
descriptor));
   return std::move(flight_info);
 }
 
-arrow::Result<int64_t> PreparedStatement::ExecuteUpdate() {
+arrow::Result<int64_t> PreparedStatement::ExecuteUpdate(
+    const FlightCallOptions& options) {
   if (is_closed_) {
     return Status::Invalid("Statement already closed.");
   }
 
   flight_sql_pb::CommandPreparedStatementUpdate command;
   command.set_prepared_statement_handle(handle_);
-  const FlightDescriptor& descriptor = GetFlightDescriptorForCommand(command);
+  ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
+                        GetFlightDescriptorForCommand(command));
   std::unique_ptr<FlightStreamWriter> writer;
   std::unique_ptr<FlightMetadataReader> reader;
 
   if (parameter_binding_ && parameter_binding_->num_rows() > 0) {
-    ARROW_RETURN_NOT_OK(client_->DoPut(options_, descriptor, 
parameter_binding_->schema(),
+    ARROW_RETURN_NOT_OK(client_->DoPut(options, descriptor, 
parameter_binding_->schema(),
                                        &writer, &reader));
     ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*parameter_binding_));
   } else {
     const std::shared_ptr<Schema> schema = arrow::schema({});
-    ARROW_RETURN_NOT_OK(client_->DoPut(options_, descriptor, schema, &writer, 
&reader));
+    ARROW_RETURN_NOT_OK(client_->DoPut(options, descriptor, schema, &writer, 
&reader));
     const ArrayVector columns;
     const auto& record_batch = arrow::RecordBatch::Make(schema, 0, columns);
     ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
@@ -406,7 +493,20 @@ std::shared_ptr<Schema> 
PreparedStatement::parameter_schema() const {
   return parameter_schema_;
 }
 
-Status PreparedStatement::Close() {
+arrow::Result<std::unique_ptr<SchemaResult>> PreparedStatement::GetSchema(
+    const FlightCallOptions& options) {
+  if (is_closed_) {
+    return Status::Invalid("Statement already closed");
+  }
+
+  flight_sql_pb::CommandPreparedStatementQuery command;
+  command.set_prepared_statement_handle(handle_);
+  ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
+                        GetFlightDescriptorForCommand(command));
+  return client_->GetSchema(options, descriptor);
+}
+
+Status PreparedStatement::Close(const FlightCallOptions& options) {
   if (is_closed_) {
     return Status::Invalid("Statement already closed.");
   }
@@ -422,7 +522,7 @@ Status PreparedStatement::Close() {
 
   std::unique_ptr<ResultStream> results;
 
-  ARROW_RETURN_NOT_OK(client_->DoAction(options_, action, &results));
+  ARROW_RETURN_NOT_OK(client_->DoAction(options, action, &results));
 
   is_closed_ = true;
 
@@ -431,14 +531,6 @@ Status PreparedStatement::Close() {
 
 Status FlightSqlClient::Close() { return impl_->Close(); }
 
-arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetSqlInfo(
-    const FlightCallOptions& options, const std::vector<int>& sql_info) {
-  flight_sql_pb::CommandGetSqlInfo command;
-  for (const int& info : sql_info) command.add_info(info);
-
-  return GetFlightInfoForCommand(*this, options, command);
-}
-
 }  // namespace sql
 }  // namespace flight
 }  // namespace arrow
diff --git a/cpp/src/arrow/flight/sql/client.h 
b/cpp/src/arrow/flight/sql/client.h
index 7c8cb640e8..26315e0d23 100644
--- a/cpp/src/arrow/flight/sql/client.h
+++ b/cpp/src/arrow/flight/sql/client.h
@@ -54,6 +54,10 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   arrow::Result<std::unique_ptr<FlightInfo>> Execute(const FlightCallOptions& 
options,
                                                      const std::string& query);
 
+  /// \brief Get the result set schema from the server.
+  arrow::Result<std::unique_ptr<SchemaResult>> GetExecuteSchema(
+      const FlightCallOptions& options, const std::string& query);
+
   /// \brief Execute an update query on the server.
   /// \param[in] options      RPC-layer hints for this call.
   /// \param[in] query        The query to be executed in the UTF-8 format.
@@ -67,6 +71,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   arrow::Result<std::unique_ptr<FlightInfo>> GetCatalogs(
       const FlightCallOptions& options);
 
+  /// \brief Get the catalogs schema from the server (should be
+  ///   identical to SqlSchema::GetCatalogsSchema).
+  arrow::Result<std::unique_ptr<SchemaResult>> GetCatalogsSchema(
+      const FlightCallOptions& options);
+
   /// \brief Request a list of database schemas.
   /// \param[in] options                   RPC-layer hints for this call.
   /// \param[in] catalog                   The catalog.
@@ -76,6 +85,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
       const FlightCallOptions& options, const std::string* catalog,
       const std::string* db_schema_filter_pattern);
 
+  /// \brief Get the database schemas schema from the server (should be
+  ///   identical to SqlSchema::GetDbSchemasSchema).
+  arrow::Result<std::unique_ptr<SchemaResult>> GetDbSchemasSchema(
+      const FlightCallOptions& options);
+
   /// \brief Given a flight ticket and schema, request to be sent the
   /// stream. Returns record batch stream reader
   /// \param[in] options Per-RPC options
@@ -99,6 +113,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
       const std::string* table_filter_pattern, bool include_schema,
       const std::vector<std::string>* table_types);
 
+  /// \brief Get the tables schema from the server (should be
+  ///   identical to SqlSchema::GetTablesSchema).
+  arrow::Result<std::unique_ptr<SchemaResult>> GetTablesSchema(
+      const FlightCallOptions& options, bool include_schema);
+
   /// \brief Request the primary keys for a table.
   /// \param[in] options          RPC-layer hints for this call.
   /// \param[in] table_ref        The table reference.
@@ -106,6 +125,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   arrow::Result<std::unique_ptr<FlightInfo>> GetPrimaryKeys(
       const FlightCallOptions& options, const TableRef& table_ref);
 
+  /// \brief Get the primary keys schema from the server (should be
+  ///   identical to SqlSchema::GetPrimaryKeysSchema).
+  arrow::Result<std::unique_ptr<SchemaResult>> GetPrimaryKeysSchema(
+      const FlightCallOptions& options);
+
   /// \brief Retrieves a description about the foreign key columns that 
reference the
   /// primary key columns of the given table.
   /// \param[in] options          RPC-layer hints for this call.
@@ -114,6 +138,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   arrow::Result<std::unique_ptr<FlightInfo>> GetExportedKeys(
       const FlightCallOptions& options, const TableRef& table_ref);
 
+  /// \brief Get the exported keys schema from the server (should be
+  ///   identical to SqlSchema::GetExportedKeysSchema).
+  arrow::Result<std::unique_ptr<SchemaResult>> GetExportedKeysSchema(
+      const FlightCallOptions& options);
+
   /// \brief Retrieves the foreign key columns for the given table.
   /// \param[in] options          RPC-layer hints for this call.
   /// \param[in] table_ref        The table reference.
@@ -121,6 +150,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   arrow::Result<std::unique_ptr<FlightInfo>> GetImportedKeys(
       const FlightCallOptions& options, const TableRef& table_ref);
 
+  /// \brief Get the imported keys schema from the server (should be
+  ///   identical to SqlSchema::GetImportedKeysSchema).
+  arrow::Result<std::unique_ptr<SchemaResult>> GetImportedKeysSchema(
+      const FlightCallOptions& options);
+
   /// \brief Retrieves a description of the foreign key columns in the given 
foreign key
   ///        table that reference the primary key or the columns representing 
a unique
   ///        constraint of the parent table (could be the same or a different 
table).
@@ -132,12 +166,22 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
       const FlightCallOptions& options, const TableRef& pk_table_ref,
       const TableRef& fk_table_ref);
 
+  /// \brief Get the cross reference schema from the server (should be
+  ///   identical to SqlSchema::GetCrossReferenceSchema).
+  arrow::Result<std::unique_ptr<SchemaResult>> GetCrossReferenceSchema(
+      const FlightCallOptions& options);
+
   /// \brief Request a list of table types.
   /// \param[in] options          RPC-layer hints for this call.
   /// \return The FlightInfo describing where to access the dataset.
   arrow::Result<std::unique_ptr<FlightInfo>> GetTableTypes(
       const FlightCallOptions& options);
 
+  /// \brief Get the table types schema from the server (should be
+  ///   identical to SqlSchema::GetTableTypesSchema).
+  arrow::Result<std::unique_ptr<SchemaResult>> GetTableTypesSchema(
+      const FlightCallOptions& options);
+
   /// \brief Request the information about all the data types supported.
   /// \param[in] options          RPC-layer hints for this call.
   /// \return The FlightInfo describing where to access the dataset.
@@ -151,6 +195,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   arrow::Result<std::unique_ptr<FlightInfo>> GetXdbcTypeInfo(
       const FlightCallOptions& options, int data_type);
 
+  /// \brief Get the type info schema from the server (should be
+  ///   identical to SqlSchema::GetXdbcTypeInfoSchema).
+  arrow::Result<std::unique_ptr<SchemaResult>> GetXdbcTypeInfoSchema(
+      const FlightCallOptions& options);
+
   /// \brief Request a list of SQL information.
   /// \param[in] options RPC-layer hints for this call.
   /// \param[in] sql_info the SQL info required.
@@ -158,6 +207,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   arrow::Result<std::unique_ptr<FlightInfo>> GetSqlInfo(const 
FlightCallOptions& options,
                                                         const 
std::vector<int>& sql_info);
 
+  /// \brief Get the SQL information schema from the server (should be
+  ///   identical to SqlSchema::GetSqlInfoSchema).
+  arrow::Result<std::unique_ptr<SchemaResult>> GetSqlInfoSchema(
+      const FlightCallOptions& options);
+
   /// \brief Create a prepared statement object.
   /// \param[in] options              RPC-layer hints for this call.
   /// \param[in] query                The query that will be executed.
@@ -165,17 +219,18 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   arrow::Result<std::shared_ptr<PreparedStatement>> Prepare(
       const FlightCallOptions& options, const std::string& query);
 
-  /// \brief Retrieve the FlightInfo.
-  /// \param[in] options      RPC-layer hints for this call.
-  /// \param[in] descriptor   The flight descriptor.
-  /// \return The flight info with the metadata.
-  // NOTE: This is public because it is been used by the anonymous
-  // function GetFlightInfoForCommand.
+  /// \brief Call the underlying Flight client's GetFlightInfo.
   virtual arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfo(
       const FlightCallOptions& options, const FlightDescriptor& descriptor) {
     return impl_->GetFlightInfo(options, descriptor);
   }
 
+  /// \brief Call the underlying Flight client's GetSchema.
+  virtual arrow::Result<std::unique_ptr<SchemaResult>> GetSchema(
+      const FlightCallOptions& options, const FlightDescriptor& descriptor) {
+    return impl_->GetSchema(options, descriptor);
+  }
+
   /// \brief Explicitly shut down and clean up the client.
   Status Close();
 
@@ -212,10 +267,9 @@ class ARROW_FLIGHT_SQL_EXPORT PreparedStatement {
   /// \param[in] handle                Handle for this prepared statement.
   /// \param[in] dataset_schema        Schema of the resulting dataset.
   /// \param[in] parameter_schema      Schema of the parameters (if any).
-  /// \param[in] options               RPC-layer hints for this call.
   PreparedStatement(FlightSqlClient* client, std::string handle,
                     std::shared_ptr<Schema> dataset_schema,
-                    std::shared_ptr<Schema> parameter_schema, 
FlightCallOptions options);
+                    std::shared_ptr<Schema> parameter_schema);
 
   /// \brief Default destructor for the PreparedStatement class.
   /// The destructor will call the Close method from the class in order,
@@ -226,11 +280,12 @@ class ARROW_FLIGHT_SQL_EXPORT PreparedStatement {
 
   /// \brief Executes the prepared statement query on the server.
   /// \return A FlightInfo object representing the stream(s) to fetch.
-  arrow::Result<std::unique_ptr<FlightInfo>> Execute();
+  arrow::Result<std::unique_ptr<FlightInfo>> Execute(
+      const FlightCallOptions& options = {});
 
   /// \brief Executes the prepared statement update query on the server.
   /// \return The number of rows affected.
-  arrow::Result<int64_t> ExecuteUpdate();
+  arrow::Result<int64_t> ExecuteUpdate(const FlightCallOptions& options = {});
 
   /// \brief Retrieve the parameter schema from the query.
   /// \return The parameter schema from the query.
@@ -245,10 +300,15 @@ class ARROW_FLIGHT_SQL_EXPORT PreparedStatement {
   /// \return                     Status.
   Status SetParameters(std::shared_ptr<RecordBatch> parameter_binding);
 
+  /// \brief Re-request the result set schema from the server (should
+  ///   be identical to dataset_schema).
+  arrow::Result<std::unique_ptr<SchemaResult>> GetSchema(
+      const FlightCallOptions& options = {});
+
   /// \brief Close the prepared statement, so that this PreparedStatement can 
not used
   /// anymore and server can free up any resources.
   /// \return Status.
-  Status Close();
+  Status Close(const FlightCallOptions& options = {});
 
   /// \brief Check if the prepared statement is closed.
   /// \return The state of the prepared statement.
@@ -256,7 +316,6 @@ class ARROW_FLIGHT_SQL_EXPORT PreparedStatement {
 
  private:
   FlightSqlClient* client_;
-  FlightCallOptions options_;
   std::string handle_;
   std::shared_ptr<Schema> dataset_schema_;
   std::shared_ptr<Schema> parameter_schema_;
diff --git a/cpp/src/arrow/flight/sql/server.cc 
b/cpp/src/arrow/flight/sql/server.cc
index 0ebe647ba1..78fbff0c33 100644
--- a/cpp/src/arrow/flight/sql/server.cc
+++ b/cpp/src/arrow/flight/sql/server.cc
@@ -344,6 +344,72 @@ Status FlightSqlServerBase::GetFlightInfo(const 
ServerCallContext& context,
   return Status::Invalid("The defined request is invalid.");
 }
 
+Status FlightSqlServerBase::GetSchema(const ServerCallContext& context,
+                                      const FlightDescriptor& request,
+                                      std::unique_ptr<SchemaResult>* schema) {
+  google::protobuf::Any any;
+  if (!any.ParseFromArray(request.cmd.data(), 
static_cast<int>(request.cmd.size()))) {
+    return Status::Invalid("Unable to parse command");
+  }
+
+  if (any.Is<pb::sql::CommandStatementQuery>()) {
+    ARROW_ASSIGN_OR_RAISE(StatementQuery internal_command,
+                          ParseCommandStatementQuery(any));
+    ARROW_ASSIGN_OR_RAISE(*schema,
+                          GetSchemaStatement(context, internal_command, 
request));
+    return Status::OK();
+  } else if (any.Is<pb::sql::CommandPreparedStatementQuery>()) {
+    ARROW_ASSIGN_OR_RAISE(PreparedStatementQuery internal_command,
+                          ParseCommandPreparedStatementQuery(any));
+    ARROW_ASSIGN_OR_RAISE(*schema,
+                          GetSchemaPreparedStatement(context, 
internal_command, request));
+    return Status::OK();
+  } else if (any.Is<pb::sql::CommandGetCatalogs>()) {
+    ARROW_ASSIGN_OR_RAISE(*schema, 
SchemaResult::Make(*SqlSchema::GetCatalogsSchema()));
+    return Status::OK();
+  } else if (any.Is<pb::sql::CommandGetCrossReference>()) {
+    ARROW_ASSIGN_OR_RAISE(*schema,
+                          
SchemaResult::Make(*SqlSchema::GetCrossReferenceSchema()));
+    return Status::OK();
+  } else if (any.Is<pb::sql::CommandGetDbSchemas>()) {
+    ARROW_ASSIGN_OR_RAISE(*schema, 
SchemaResult::Make(*SqlSchema::GetDbSchemasSchema()));
+    return Status::OK();
+  } else if (any.Is<pb::sql::CommandGetExportedKeys>()) {
+    ARROW_ASSIGN_OR_RAISE(*schema,
+                          
SchemaResult::Make(*SqlSchema::GetExportedKeysSchema()));
+    return Status::OK();
+  } else if (any.Is<pb::sql::CommandGetImportedKeys>()) {
+    ARROW_ASSIGN_OR_RAISE(*schema,
+                          
SchemaResult::Make(*SqlSchema::GetImportedKeysSchema()));
+    return Status::OK();
+  } else if (any.Is<pb::sql::CommandGetPrimaryKeys>()) {
+    ARROW_ASSIGN_OR_RAISE(*schema,
+                          
SchemaResult::Make(*SqlSchema::GetPrimaryKeysSchema()));
+    return Status::OK();
+  } else if (any.Is<pb::sql::CommandGetSqlInfo>()) {
+    ARROW_ASSIGN_OR_RAISE(*schema, 
SchemaResult::Make(*SqlSchema::GetSqlInfoSchema()));
+    return Status::OK();
+  } else if (any.Is<pb::sql::CommandGetTables>()) {
+    ARROW_ASSIGN_OR_RAISE(GetTables command, ParseCommandGetTables(any));
+    if (command.include_schema) {
+      ARROW_ASSIGN_OR_RAISE(
+          *schema, 
SchemaResult::Make(*SqlSchema::GetTablesSchemaWithIncludedSchema()));
+    } else {
+      ARROW_ASSIGN_OR_RAISE(*schema, 
SchemaResult::Make(*SqlSchema::GetTablesSchema()));
+    }
+    return Status::OK();
+  } else if (any.Is<pb::sql::CommandGetTableTypes>()) {
+    ARROW_ASSIGN_OR_RAISE(*schema, 
SchemaResult::Make(*SqlSchema::GetTableTypesSchema()));
+    return Status::OK();
+  } else if (any.Is<pb::sql::CommandGetXdbcTypeInfo>()) {
+    ARROW_ASSIGN_OR_RAISE(*schema,
+                          
SchemaResult::Make(*SqlSchema::GetXdbcTypeInfoSchema()));
+    return Status::OK();
+  }
+
+  return Status::NotImplemented("Command not recognized: ", any.type_url());
+}
+
 Status FlightSqlServerBase::DoGet(const ServerCallContext& context, const 
Ticket& request,
                                   std::unique_ptr<FlightDataStream>* stream) {
   google::protobuf::Any any;
@@ -531,6 +597,12 @@ arrow::Result<std::unique_ptr<FlightInfo>> 
FlightSqlServerBase::GetFlightInfoSta
   return Status::NotImplemented("GetFlightInfoStatement not implemented");
 }
 
+arrow::Result<std::unique_ptr<SchemaResult>> 
FlightSqlServerBase::GetSchemaStatement(
+    const ServerCallContext& context, const StatementQuery& command,
+    const FlightDescriptor& descriptor) {
+  return Status::NotImplemented("GetSchemaStatement not implemented");
+}
+
 arrow::Result<std::unique_ptr<FlightDataStream>> 
FlightSqlServerBase::DoGetStatement(
     const ServerCallContext& context, const StatementQueryTicket& command) {
   return Status::NotImplemented("DoGetStatement not implemented");
@@ -543,6 +615,13 @@ FlightSqlServerBase::GetFlightInfoPreparedStatement(const 
ServerCallContext& con
   return Status::NotImplemented("GetFlightInfoPreparedStatement not 
implemented");
 }
 
+arrow::Result<std::unique_ptr<SchemaResult>>
+FlightSqlServerBase::GetSchemaPreparedStatement(const ServerCallContext& 
context,
+                                                const PreparedStatementQuery& 
command,
+                                                const FlightDescriptor& 
descriptor) {
+  return Status::NotImplemented("GetSchemaPreparedStatement not implemented");
+}
+
 arrow::Result<std::unique_ptr<FlightDataStream>>
 FlightSqlServerBase::DoGetPreparedStatement(const ServerCallContext& context,
                                             const PreparedStatementQuery& 
command) {
diff --git a/cpp/src/arrow/flight/sql/server.h 
b/cpp/src/arrow/flight/sql/server.h
index f077c5d5d5..49e239a0cd 100644
--- a/cpp/src/arrow/flight/sql/server.h
+++ b/cpp/src/arrow/flight/sql/server.h
@@ -28,6 +28,7 @@
 #include "arrow/flight/sql/server.h"
 #include "arrow/flight/sql/types.h"
 #include "arrow/flight/sql/visibility.h"
+#include "arrow/flight/types.h"
 #include "arrow/util/optional.h"
 
 namespace arrow {
@@ -221,6 +222,25 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public 
FlightServerBase {
   virtual arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoCatalogs(
       const ServerCallContext& context, const FlightDescriptor& descriptor);
 
+  /// \brief Get the schema of the result set of a query.
+  /// \param[in] context      Per-call context.
+  /// \param[in] command      The StatementQuery containing the SQL query.
+  /// \param[in] descriptor   The descriptor identifying the data stream.
+  /// \return                 The schema of the result set.
+  virtual arrow::Result<std::unique_ptr<SchemaResult>> GetSchemaStatement(
+      const ServerCallContext& context, const StatementQuery& command,
+      const FlightDescriptor& descriptor);
+
+  /// \brief Get the schema of the result set of a prepared statement.
+  /// \param[in] context      Per-call context.
+  /// \param[in] command      The PreparedStatementQuery containing the
+  ///                         prepared statement handle.
+  /// \param[in] descriptor   The descriptor identifying the data stream.
+  /// \return                 The schema of the result set.
+  virtual arrow::Result<std::unique_ptr<SchemaResult>> 
GetSchemaPreparedStatement(
+      const ServerCallContext& context, const PreparedStatementQuery& command,
+      const FlightDescriptor& descriptor);
+
   /// \brief Get a FlightDataStream containing the list of catalogs.
   /// \param[in] context  Per-call context.
   /// \return             An interface for sending data back to the client.
@@ -462,6 +482,9 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public 
FlightServerBase {
   Status GetFlightInfo(const ServerCallContext& context, const 
FlightDescriptor& request,
                        std::unique_ptr<FlightInfo>* info) final;
 
+  Status GetSchema(const ServerCallContext& context, const FlightDescriptor& 
request,
+                   std::unique_ptr<SchemaResult>* schema) override;
+
   Status DoGet(const ServerCallContext& context, const Ticket& request,
                std::unique_ptr<FlightDataStream>* stream) final;
 
diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc
index ddb8a036fb..6e80f40cfb 100644
--- a/cpp/src/arrow/flight/types.cc
+++ b/cpp/src/arrow/flight/types.cc
@@ -28,6 +28,7 @@
 #include "arrow/ipc/reader.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
+#include "arrow/util/make_unique.h"
 #include "arrow/util/string_view.h"
 #include "arrow/util/uri.h"
 
@@ -150,10 +151,10 @@ arrow::Result<std::shared_ptr<Schema>> 
SchemaResult::GetSchema(
   return ipc::ReadSchema(&schema_reader, dictionary_memo);
 }
 
-arrow::Result<SchemaResult> SchemaResult::Make(const Schema& schema) {
+arrow::Result<std::unique_ptr<SchemaResult>> SchemaResult::Make(const Schema& 
schema) {
   std::string schema_in;
   RETURN_NOT_OK(internal::SchemaToString(schema, &schema_in));
-  return SchemaResult(std::move(schema_in));
+  return arrow::internal::make_unique<SchemaResult>(std::move(schema_in));
 }
 
 Status SchemaResult::GetSchema(ipc::DictionaryMemo* dictionary_memo,
diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h
index a061f33afe..2ec24ff586 100644
--- a/cpp/src/arrow/flight/types.h
+++ b/cpp/src/arrow/flight/types.h
@@ -397,7 +397,7 @@ struct ARROW_FLIGHT_EXPORT SchemaResult {
   explicit SchemaResult(std::string schema) : raw_schema_(std::move(schema)) {}
 
   /// \brief Factory method to construct a SchemaResult.
-  static arrow::Result<SchemaResult> Make(const Schema& schema);
+  static arrow::Result<std::unique_ptr<SchemaResult>> Make(const Schema& 
schema);
 
   /// \brief return schema
   /// \param[in,out] dictionary_memo for dictionary bookkeeping, will
diff --git a/cpp/src/arrow/python/flight.cc b/cpp/src/arrow/python/flight.cc
index 9077bbe4ac..bf7af27ac7 100644
--- a/cpp/src/arrow/python/flight.cc
+++ b/cpp/src/arrow/python/flight.cc
@@ -380,10 +380,7 @@ Status CreateFlightInfo(const 
std::shared_ptr<arrow::Schema>& schema,
 
 Status CreateSchemaResult(const std::shared_ptr<arrow::Schema>& schema,
                           std::unique_ptr<arrow::flight::SchemaResult>* out) {
-  ARROW_ASSIGN_OR_RAISE(auto result, 
arrow::flight::SchemaResult::Make(*schema));
-  *out = std::unique_ptr<arrow::flight::SchemaResult>(
-      new arrow::flight::SchemaResult(std::move(result)));
-  return Status::OK();
+  return arrow::flight::SchemaResult::Make(*schema).Value(out);
 }
 
 }  // namespace flight
diff --git a/go/arrow/flight/flightsql/client.go 
b/go/arrow/flight/flightsql/client.go
index 5f7f693d2b..b8ee01cfde 100644
--- a/go/arrow/flight/flightsql/client.go
+++ b/go/arrow/flight/flightsql/client.go
@@ -77,6 +77,14 @@ func flightInfoForCommand(ctx context.Context, cl *Client, 
cmd proto.Message, op
        return cl.getFlightInfo(ctx, desc, opts...)
 }
 
+func schemaForCommand(ctx context.Context, cl *Client, cmd proto.Message, opts 
...grpc.CallOption) (*flight.SchemaResult, error) {
+       desc, err := descForCommand(cmd)
+       if err != nil {
+               return nil, err
+       }
+       return cl.getSchema(ctx, desc, opts...)
+}
+
 // Execute executes the desired query on the server and returns a FlightInfo
 // object describing where to retrieve the results.
 func (c *Client) Execute(ctx context.Context, query string, opts 
...grpc.CallOption) (*flight.FlightInfo, error) {
@@ -84,6 +92,13 @@ func (c *Client) Execute(ctx context.Context, query string, 
opts ...grpc.CallOpt
        return flightInfoForCommand(ctx, c, &cmd, opts...)
 }
 
+// GetExecuteSchema gets the schema of the result set of a query without
+// executing the query itself.
+func (c *Client) GetExecuteSchema(ctx context.Context, query string, opts 
...grpc.CallOption) (*flight.SchemaResult, error) {
+       cmd := pb.CommandStatementQuery{Query: query}
+       return schemaForCommand(ctx, c, &cmd, opts...)
+}
+
 // ExecuteUpdate is for executing an update query and only returns the number 
of affected rows.
 func (c *Client) ExecuteUpdate(ctx context.Context, query string, opts 
...grpc.CallOption) (n int64, err error) {
        var (
@@ -128,12 +143,22 @@ func (c *Client) GetCatalogs(ctx context.Context, opts 
...grpc.CallOption) (*fli
        return flightInfoForCommand(ctx, c, &pb.CommandGetCatalogs{}, opts...)
 }
 
+// GetCatalogsSchema requests the schema of GetCatalogs from the server
+func (c *Client) GetCatalogsSchema(ctx context.Context, opts 
...grpc.CallOption) (*flight.SchemaResult, error) {
+       return schemaForCommand(ctx, c, &pb.CommandGetCatalogs{}, opts...)
+}
+
 // GetDBSchemas requests the list of schemas from the database and
 // returns a FlightInfo object where the response can be retrieved
 func (c *Client) GetDBSchemas(ctx context.Context, cmdOpts *GetDBSchemasOpts, 
opts ...grpc.CallOption) (*flight.FlightInfo, error) {
        return flightInfoForCommand(ctx, c, (*pb.CommandGetDbSchemas)(cmdOpts), 
opts...)
 }
 
+// GetDBSchemasSchema requests the schema of GetDBSchemas from the server
+func (c *Client) GetDBSchemasSchema(ctx context.Context, opts 
...grpc.CallOption) (*flight.SchemaResult, error) {
+       return schemaForCommand(ctx, c, &pb.CommandGetDbSchemas{}, opts...)
+}
+
 // DoGet uses the provided flight ticket to request the stream of data.
 // It returns a recordbatch reader to stream the results. Release
 // should be called on the reader when done.
@@ -154,6 +179,11 @@ func (c *Client) GetTables(ctx context.Context, reqOptions 
*GetTablesOpts, opts
        return flightInfoForCommand(ctx, c, (*pb.CommandGetTables)(reqOptions), 
opts...)
 }
 
+// GetTablesSchema requests the schema of GetTables from the server.
+func (c *Client) GetTablesSchema(ctx context.Context, reqOptions 
*GetTablesOpts, opts ...grpc.CallOption) (*flight.SchemaResult, error) {
+       return schemaForCommand(ctx, c, (*pb.CommandGetTables)(reqOptions), 
opts...)
+}
+
 // GetPrimaryKeys requests the primary keys for a specific table from the
 // server, specified using a TableRef. Returns a FlightInfo object where
 // the response can be retrieved.
@@ -166,6 +196,11 @@ func (c *Client) GetPrimaryKeys(ctx context.Context, ref 
TableRef, opts ...grpc.
        return flightInfoForCommand(ctx, c, &cmd, opts...)
 }
 
+// GetPrimaryKeysSchema requests the schema of GetPrimaryKeys from the server.
+func (c *Client) GetPrimaryKeysSchema(ctx context.Context, opts 
...grpc.CallOption) (*flight.SchemaResult, error) {
+       return schemaForCommand(ctx, c, &pb.CommandGetPrimaryKeys{}, opts...)
+}
+
 // GetExportedKeys retrieves a description about the foreign key columns
 // that reference the primary key columns of the specified table. Returns
 // a FlightInfo object where the response can be retrieved.
@@ -178,6 +213,11 @@ func (c *Client) GetExportedKeys(ctx context.Context, ref 
TableRef, opts ...grpc
        return flightInfoForCommand(ctx, c, &cmd, opts...)
 }
 
+// GetExportedKeysSchema requests the schema of GetExportedKeys from the 
server.
+func (c *Client) GetExportedKeysSchema(ctx context.Context, opts 
...grpc.CallOption) (*flight.SchemaResult, error) {
+       return schemaForCommand(ctx, c, &pb.CommandGetExportedKeys{}, opts...)
+}
+
 // GetImportedKeys returns the foreign key columns for the specified table.
 // Returns a FlightInfo object indicating where the response can be retrieved.
 func (c *Client) GetImportedKeys(ctx context.Context, ref TableRef, opts 
...grpc.CallOption) (*flight.FlightInfo, error) {
@@ -189,6 +229,11 @@ func (c *Client) GetImportedKeys(ctx context.Context, ref 
TableRef, opts ...grpc
        return flightInfoForCommand(ctx, c, &cmd, opts...)
 }
 
+// GetImportedKeysSchema requests the schema of GetImportedKeys from the 
server.
+func (c *Client) GetImportedKeysSchema(ctx context.Context, opts 
...grpc.CallOption) (*flight.SchemaResult, error) {
+       return schemaForCommand(ctx, c, &pb.CommandGetImportedKeys{}, opts...)
+}
+
 // GetCrossReference retrieves a description of the foreign key columns
 // in the specified ForeignKey table that reference the primary key or
 // columns representing a restraint of the parent table (could be the same
@@ -206,6 +251,11 @@ func (c *Client) GetCrossReference(ctx context.Context, 
pkTable, fkTable TableRe
        return flightInfoForCommand(ctx, c, &cmd, opts...)
 }
 
+// GetCrossReferenceSchema requests the schema of GetCrossReference from the 
server.
+func (c *Client) GetCrossReferenceSchema(ctx context.Context, opts 
...grpc.CallOption) (*flight.SchemaResult, error) {
+       return schemaForCommand(ctx, c, &pb.CommandGetCrossReference{}, opts...)
+}
+
 // GetTableTypes requests a list of the types of tables available on this
 // server. Returns a FlightInfo object indicating where the response can
 // be retrieved.
@@ -213,6 +263,11 @@ func (c *Client) GetTableTypes(ctx context.Context, opts 
...grpc.CallOption) (*f
        return flightInfoForCommand(ctx, c, &pb.CommandGetTableTypes{}, opts...)
 }
 
+// GetTableTypesSchema requests the schema of GetTableTypes from the server.
+func (c *Client) GetTableTypesSchema(ctx context.Context, opts 
...grpc.CallOption) (*flight.SchemaResult, error) {
+       return schemaForCommand(ctx, c, &pb.CommandGetTableTypes{}, opts...)
+}
+
 // GetXdbcTypeInfo requests the information about all the data types supported
 // (dataType == nil) or a specific data type. Returns a FlightInfo object
 // indicating where the response can be retrieved.
@@ -220,6 +275,11 @@ func (c *Client) GetXdbcTypeInfo(ctx context.Context, 
dataType *int32, opts ...g
        return flightInfoForCommand(ctx, c, 
&pb.CommandGetXdbcTypeInfo{DataType: dataType}, opts...)
 }
 
+// GetXdbcTypeInfoSchema requests the schema of GetXdbcTypeInfo from the 
server.
+func (c *Client) GetXdbcTypeInfoSchema(ctx context.Context, opts 
...grpc.CallOption) (*flight.SchemaResult, error) {
+       return schemaForCommand(ctx, c, &pb.CommandGetXdbcTypeInfo{}, opts...)
+}
+
 // GetSqlInfo returns a list of the requested SQL information corresponding
 // to the values in the info slice. Returns a FlightInfo object indicating
 // where the response can be retrieved.
@@ -232,6 +292,11 @@ func (c *Client) GetSqlInfo(ctx context.Context, info 
[]SqlInfo, opts ...grpc.Ca
        return flightInfoForCommand(ctx, c, cmd, opts...)
 }
 
+// GetSqlInfoSchema requests the schema of  GetSqlInfo from the server.
+func (c *Client) GetSqlInfoSchema(ctx context.Context, opts 
...grpc.CallOption) (*flight.SchemaResult, error) {
+       return schemaForCommand(ctx, c, &pb.CommandGetSqlInfo{}, opts...)
+}
+
 // Prepare creates a PreparedStatement object for the specified query.
 // The resulting PreparedStatement object should be Closed when no longer
 // needed. It will maintain a reference to this Client for use to execute
@@ -302,6 +367,10 @@ func (c *Client) getFlightInfo(ctx context.Context, desc 
*flight.FlightDescripto
        return c.Client.GetFlightInfo(ctx, desc, opts...)
 }
 
+func (c *Client) getSchema(ctx context.Context, desc *flight.FlightDescriptor, 
opts ...grpc.CallOption) (*flight.SchemaResult, error) {
+       return c.Client.GetSchema(ctx, desc, opts...)
+}
+
 // Close will close the underlying flight Client in use by this 
flightsql.Client
 func (c *Client) Close() error { return c.Client.Close() }
 
@@ -430,6 +499,25 @@ func (p *PreparedStatement) DatasetSchema() *arrow.Schema 
{ return p.datasetSche
 // the prepared statement.
 func (p *PreparedStatement) ParameterSchema() *arrow.Schema { return 
p.paramSchema }
 
+// GetSchema re-requests the schema of the result set of the prepared
+// statement from the server. It should otherwise be identical to 
DatasetSchema.
+//
+// Will error if already closed.
+func (p *PreparedStatement) GetSchema(ctx context.Context) 
(*flight.SchemaResult, error) {
+       if p.closed {
+               return nil, errors.New("arrow/flightsql: prepared statement 
already closed")
+       }
+
+       cmd := &pb.CommandPreparedStatementQuery{PreparedStatementHandle: 
p.handle}
+
+       desc, err := descForCommand(cmd)
+       if err != nil {
+               return nil, err
+       }
+
+       return p.client.getSchema(ctx, desc, p.opts...)
+}
+
 // SetParameters takes a record batch to send as the parameter bindings when
 // executing. It should match the schema from ParameterSchema.
 //
diff --git a/go/arrow/flight/flightsql/server.go 
b/go/arrow/flight/flightsql/server.go
index 17bc9e188a..8080df9e4b 100644
--- a/go/arrow/flight/flightsql/server.go
+++ b/go/arrow/flight/flightsql/server.go
@@ -181,6 +181,10 @@ func (BaseServer) GetFlightInfoStatement(context.Context, 
StatementQuery, *fligh
        return nil, status.Errorf(codes.Unimplemented, "GetFlightInfoStatement 
not implemented")
 }
 
+func (BaseServer) GetSchemaStatement(context.Context, StatementQuery, 
*flight.FlightDescriptor) (*flight.SchemaResult, error) {
+       return nil, status.Errorf(codes.Unimplemented, "GetSchemaStatement not 
implemented")
+}
+
 func (BaseServer) DoGetStatement(context.Context, StatementQueryTicket) 
(*arrow.Schema, <-chan flight.StreamChunk, error) {
        return nil, nil, status.Errorf(codes.Unimplemented, "DoGetStatement not 
implemented")
 }
@@ -189,6 +193,10 @@ func (BaseServer) 
GetFlightInfoPreparedStatement(context.Context, PreparedStatem
        return nil, status.Errorf(codes.Unimplemented, 
"GetFlightInfoPreparedStatement not implemented")
 }
 
+func (BaseServer) GetSchemaPreparedStatement(context.Context, 
PreparedStatementQuery, *flight.FlightDescriptor) (*flight.SchemaResult, error) 
{
+       return nil, status.Errorf(codes.Unimplemented, 
"GetSchemaPreparedStatement not implemented")
+}
+
 func (BaseServer) DoGetPreparedStatement(context.Context, 
PreparedStatementQuery) (*arrow.Schema, <-chan flight.StreamChunk, error) {
        return nil, nil, status.Errorf(codes.Unimplemented, 
"DoGetPreparedStatement not implemented")
 }
@@ -367,12 +375,17 @@ func (BaseServer) 
DoPutPreparedStatementUpdate(context.Context, PreparedStatemen
 type Server interface {
        // GetFlightInfoStatement returns a FlightInfo for executing the 
requested sql query
        GetFlightInfoStatement(context.Context, StatementQuery, 
*flight.FlightDescriptor) (*flight.FlightInfo, error)
+       // GetFlightInfoStatement returns the schema of the result set of the 
requested sql query
+       GetSchemaStatement(context.Context, StatementQuery, 
*flight.FlightDescriptor) (*flight.SchemaResult, error)
        // DoGetStatement returns a stream containing the query results for the
        // requested statement handle that was populated by 
GetFlightInfoStatement
        DoGetStatement(context.Context, StatementQueryTicket) (*arrow.Schema, 
<-chan flight.StreamChunk, error)
        // GetFlightInfoPreparedStatement returns a FlightInfo for executing an 
already
        // prepared statement with the provided statement handle.
        GetFlightInfoPreparedStatement(context.Context, PreparedStatementQuery, 
*flight.FlightDescriptor) (*flight.FlightInfo, error)
+       // GetSchemaPreparedStatement returns the schema of the result set of 
executing an already
+       // prepared statement with the provided statement handle.
+       GetSchemaPreparedStatement(context.Context, PreparedStatementQuery, 
*flight.FlightDescriptor) (*flight.SchemaResult, error)
        // DoGetPreparedStatement returns a stream containing the results from 
executing
        // a prepared statement query with the provided statement handle.
        DoGetPreparedStatement(context.Context, PreparedStatementQuery) 
(*arrow.Schema, <-chan flight.StreamChunk, error)
@@ -519,6 +532,53 @@ func (f *flightSqlServer) GetFlightInfo(ctx 
context.Context, request *flight.Fli
        return nil, status.Error(codes.InvalidArgument, "requested command is 
invalid")
 }
 
+func (f *flightSqlServer) GetSchema(ctx context.Context, request 
*flight.FlightDescriptor) (*flight.SchemaResult, error) {
+       var (
+               anycmd anypb.Any
+               cmd    proto.Message
+               err    error
+       )
+       if err = proto.Unmarshal(request.Cmd, &anycmd); err != nil {
+               return nil, status.Errorf(codes.InvalidArgument, "unable to 
parse command: %s", err.Error())
+       }
+
+       if cmd, err = anycmd.UnmarshalNew(); err != nil {
+               return nil, status.Errorf(codes.InvalidArgument, "could not 
unmarshal Any to a command type: %s", err.Error())
+       }
+
+       switch cmd := cmd.(type) {
+       case *pb.CommandStatementQuery:
+               return f.srv.GetSchemaStatement(ctx, cmd, request)
+       case *pb.CommandPreparedStatementQuery:
+               return f.srv.GetSchemaPreparedStatement(ctx, cmd, request)
+       case *pb.CommandGetCatalogs:
+               return &flight.SchemaResult{Schema: 
flight.SerializeSchema(schema_ref.Catalogs, f.mem)}, nil
+       case *pb.CommandGetDbSchemas:
+               return &flight.SchemaResult{Schema: 
flight.SerializeSchema(schema_ref.DBSchemas, f.mem)}, nil
+       case *pb.CommandGetTables:
+               if cmd.GetIncludeSchema() {
+                       return &flight.SchemaResult{Schema: 
flight.SerializeSchema(schema_ref.TablesWithIncludedSchema, f.mem)}, nil
+               }
+               return &flight.SchemaResult{Schema: 
flight.SerializeSchema(schema_ref.Tables, f.mem)}, nil
+       case *pb.CommandGetTableTypes:
+               return &flight.SchemaResult{Schema: 
flight.SerializeSchema(schema_ref.TableTypes, f.mem)}, nil
+       case *pb.CommandGetXdbcTypeInfo:
+               return &flight.SchemaResult{Schema: 
flight.SerializeSchema(schema_ref.XdbcTypeInfo, f.mem)}, nil
+       case *pb.CommandGetSqlInfo:
+               return &flight.SchemaResult{Schema: 
flight.SerializeSchema(schema_ref.SqlInfo, f.mem)}, nil
+       case *pb.CommandGetPrimaryKeys:
+               return &flight.SchemaResult{Schema: 
flight.SerializeSchema(schema_ref.PrimaryKeys, f.mem)}, nil
+       case *pb.CommandGetExportedKeys:
+               return &flight.SchemaResult{Schema: 
flight.SerializeSchema(schema_ref.ExportedKeys, f.mem)}, nil
+       case *pb.CommandGetImportedKeys:
+               return &flight.SchemaResult{Schema: 
flight.SerializeSchema(schema_ref.ImportedKeys, f.mem)}, nil
+       case *pb.CommandGetCrossReference:
+               return &flight.SchemaResult{Schema: 
flight.SerializeSchema(schema_ref.CrossReference, f.mem)}, nil
+       }
+
+       return nil, status.Errorf(codes.InvalidArgument, "requested command is 
invalid: %s", anycmd.GetTypeUrl())
+}
+
 func (f *flightSqlServer) DoGet(request *flight.Ticket, stream 
flight.FlightService_DoGetServer) (err error) {
        var (
                anycmd anypb.Any
diff --git a/go/arrow/internal/flight_integration/scenario.go 
b/go/arrow/internal/flight_integration/scenario.go
index c89334002d..4e96d7100a 100644
--- a/go/arrow/internal/flight_integration/scenario.go
+++ b/go/arrow/internal/flight_integration/scenario.go
@@ -599,6 +599,22 @@ func (m *flightSqlScenarioTester) validate(expected 
*arrow.Schema, result *fligh
        if !expected.Equal(rdr.Schema()) {
                return fmt.Errorf("expected: %s, got: %s", expected, 
rdr.Schema())
        }
+       for {
+               _, err := rdr.Read()
+               if err == io.EOF { break }
+               if err != nil { return err }
+       }
+       return nil
+}
+
+func (m *flightSqlScenarioTester) validateSchema(expected *arrow.Schema, 
result *flight.SchemaResult) error {
+       schema, err := flight.DeserializeSchema(result.GetSchema(), 
memory.DefaultAllocator)
+       if err != nil {
+               return err
+       }
+       if !expected.Equal(schema) {
+               return fmt.Errorf("expected: %s, got: %s", expected, schema)
+       }
        return nil
 }
 
@@ -626,6 +642,14 @@ func (m *flightSqlScenarioTester) 
ValidateMetadataRetrieval(client *flightsql.Cl
                return err
        }
 
+       schema, err := client.GetCatalogsSchema(ctx)
+       if err != nil {
+               return err
+       }
+       if err := m.validateSchema(schema_ref.Catalogs, schema); err != nil {
+               return err
+       }
+
        info, err = client.GetDBSchemas(ctx, 
&flightsql.GetDBSchemasOpts{Catalog: &catalog, DbSchemaFilterPattern: 
&dbSchemaFilterPattern})
        if err != nil {
                return err
@@ -634,6 +658,14 @@ func (m *flightSqlScenarioTester) 
ValidateMetadataRetrieval(client *flightsql.Cl
                return err
        }
 
+       schema, err = client.GetDBSchemasSchema(ctx)
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(schema_ref.DBSchemas, schema); err != nil {
+               return err
+       }
+
        info, err = client.GetTables(ctx, &flightsql.GetTablesOpts{Catalog: 
&catalog, DbSchemaFilterPattern: &dbSchemaFilterPattern, 
TableNameFilterPattern: &tableFilterPattern, IncludeSchema: true, TableTypes: 
tableTypes})
        if err != nil {
                return err
@@ -642,6 +674,22 @@ func (m *flightSqlScenarioTester) 
ValidateMetadataRetrieval(client *flightsql.Cl
                return err
        }
 
+       schema, err = client.GetTablesSchema(ctx, 
&flightsql.GetTablesOpts{IncludeSchema: true})
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(schema_ref.TablesWithIncludedSchema, schema); 
err != nil {
+               return err
+       }
+
+       schema, err = client.GetTablesSchema(ctx, 
&flightsql.GetTablesOpts{IncludeSchema: false})
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(schema_ref.Tables, schema); err != nil {
+               return err
+       }
+
        info, err = client.GetTableTypes(ctx)
        if err != nil {
                return err
@@ -650,6 +698,14 @@ func (m *flightSqlScenarioTester) 
ValidateMetadataRetrieval(client *flightsql.Cl
                return err
        }
 
+       schema, err = client.GetTableTypesSchema(ctx)
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(schema_ref.TableTypes, schema); err != nil {
+               return err
+       }
+
        info, err = client.GetPrimaryKeys(ctx, ref)
        if err != nil {
                return err
@@ -658,6 +714,14 @@ func (m *flightSqlScenarioTester) 
ValidateMetadataRetrieval(client *flightsql.Cl
                return err
        }
 
+       schema, err = client.GetPrimaryKeysSchema(ctx)
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(schema_ref.PrimaryKeys, schema); err != nil {
+               return err
+       }
+
        info, err = client.GetExportedKeys(ctx, ref)
        if err != nil {
                return err
@@ -666,6 +730,14 @@ func (m *flightSqlScenarioTester) 
ValidateMetadataRetrieval(client *flightsql.Cl
                return err
        }
 
+       schema, err = client.GetExportedKeysSchema(ctx)
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(schema_ref.ExportedKeys, schema); err != nil {
+               return err
+       }
+
        info, err = client.GetImportedKeys(ctx, ref)
        if err != nil {
                return err
@@ -674,6 +746,14 @@ func (m *flightSqlScenarioTester) 
ValidateMetadataRetrieval(client *flightsql.Cl
                return err
        }
 
+       schema, err = client.GetImportedKeysSchema(ctx)
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(schema_ref.ImportedKeys, schema); err != nil {
+               return err
+       }
+
        info, err = client.GetCrossReference(ctx, pkRef, fkRef)
        if err != nil {
                return err
@@ -682,6 +762,14 @@ func (m *flightSqlScenarioTester) 
ValidateMetadataRetrieval(client *flightsql.Cl
                return err
        }
 
+       schema, err = client.GetCrossReferenceSchema(ctx)
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(schema_ref.CrossReference, schema); err != 
nil {
+               return err
+       }
+
        info, err = client.GetXdbcTypeInfo(ctx, nil)
        if err != nil {
                return err
@@ -690,6 +778,14 @@ func (m *flightSqlScenarioTester) 
ValidateMetadataRetrieval(client *flightsql.Cl
                return err
        }
 
+       schema, err = client.GetXdbcTypeInfoSchema(ctx)
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(schema_ref.XdbcTypeInfo, schema); err != nil {
+               return err
+       }
+
        info, err = client.GetSqlInfo(ctx, 
[]flightsql.SqlInfo{flightsql.SqlInfoFlightSqlServerName, 
flightsql.SqlInfoFlightSqlServerReadOnly})
        if err != nil {
                return err
@@ -698,6 +794,14 @@ func (m *flightSqlScenarioTester) 
ValidateMetadataRetrieval(client *flightsql.Cl
                return err
        }
 
+       schema, err = client.GetSqlInfoSchema(ctx)
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(schema_ref.SqlInfo, schema); err != nil {
+               return err
+       }
+
        return nil
 }
 
@@ -711,6 +815,14 @@ func (m *flightSqlScenarioTester) 
ValidateStatementExecution(client *flightsql.C
                return err
        }
 
+       schema, err := client.GetExecuteSchema(ctx, "SELECT STATEMENT")
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(QuerySchema, schema); err != nil {
+               return err
+       }
+
        updateResult, err := client.ExecuteUpdate(ctx, "UPDATE STATEMENT")
        if err != nil {
                return err
@@ -740,6 +852,13 @@ func (m *flightSqlScenarioTester) 
ValidatePreparedStatementExecution(client *fli
        if err = m.validate(QuerySchema, info, client); err != nil {
                return err
        }
+       schema, err := prepared.GetSchema(ctx)
+       if err != nil {
+               return err
+       }
+       if err = m.validateSchema(QuerySchema, schema); err != nil {
+               return err
+       }
 
        if err = prepared.Close(ctx); err != nil {
                return err
@@ -762,9 +881,7 @@ func (m *flightSqlScenarioTester) 
ValidatePreparedStatementExecution(client *fli
 
 func (m *flightSqlScenarioTester) doGetForTestCase(schema *arrow.Schema) chan 
flight.StreamChunk {
        ch := make(chan flight.StreamChunk)
-       go func() {
-               ch <- flight.StreamChunk{Data: array.NewRecord(schema, 
[]arrow.Array{}, 0)}
-       }()
+       close(ch)
        return ch
 }
 
@@ -789,6 +906,10 @@ func (m *flightSqlScenarioTester) 
GetFlightInfoStatement(ctx context.Context, cm
        }, nil
 }
 
+func (m *flightSqlScenarioTester) GetSchemaStatement(ctx context.Context, cmd 
flightsql.StatementQuery, desc *flight.FlightDescriptor) (*flight.SchemaResult, 
error) {
+       return &flight.SchemaResult{Schema: flight.SerializeSchema(QuerySchema, 
memory.DefaultAllocator)}, nil
+}
+
 func (m *flightSqlScenarioTester) DoGetStatement(ctx context.Context, cmd 
flightsql.StatementQueryTicket) (*arrow.Schema, <-chan flight.StreamChunk, 
error) {
        return QuerySchema, m.doGetForTestCase(QuerySchema), nil
 }
@@ -801,6 +922,10 @@ func (m *flightSqlScenarioTester) 
GetFlightInfoPreparedStatement(_ context.Conte
        return m.flightInfoForCommand(desc, QuerySchema), nil
 }
 
+func (m *flightSqlScenarioTester) GetSchemaPreparedStatement(ctx 
context.Context, cmd flightsql.PreparedStatementQuery, desc 
*flight.FlightDescriptor) (*flight.SchemaResult, error) {
+       return &flight.SchemaResult{Schema: flight.SerializeSchema(QuerySchema, 
memory.DefaultAllocator)}, nil
+}
+
 func (m *flightSqlScenarioTester) DoGetPreparedStatement(_ context.Context, 
cmd flightsql.PreparedStatementQuery) (*arrow.Schema, <-chan 
flight.StreamChunk, error) {
        return QuerySchema, m.doGetForTestCase(QuerySchema), nil
 }
diff --git 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenario.java
 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenario.java
index cf17349064..19c1378cfe 100644
--- 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenario.java
+++ 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenario.java
@@ -26,6 +26,7 @@ import org.apache.arrow.flight.FlightProducer;
 import org.apache.arrow.flight.FlightServer;
 import org.apache.arrow.flight.FlightStream;
 import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.SchemaResult;
 import org.apache.arrow.flight.Ticket;
 import org.apache.arrow.flight.sql.FlightSqlClient;
 import org.apache.arrow.flight.sql.FlightSqlProducer;
@@ -72,32 +73,52 @@ public class FlightSqlScenario implements Scenario {
 
     validate(FlightSqlProducer.Schemas.GET_CATALOGS_SCHEMA, 
sqlClient.getCatalogs(options),
         sqlClient);
+    validateSchema(FlightSqlProducer.Schemas.GET_CATALOGS_SCHEMA, 
sqlClient.getCatalogsSchema(options));
+
     validate(FlightSqlProducer.Schemas.GET_SCHEMAS_SCHEMA,
         sqlClient.getSchemas("catalog", "db_schema_filter_pattern", options),
         sqlClient);
+    validateSchema(FlightSqlProducer.Schemas.GET_SCHEMAS_SCHEMA, 
sqlClient.getSchemasSchema());
+
     validate(FlightSqlProducer.Schemas.GET_TABLES_SCHEMA,
         sqlClient.getTables("catalog", "db_schema_filter_pattern", 
"table_filter_pattern",
             Arrays.asList("table", "view"), true, options), sqlClient);
-    validate(FlightSqlProducer.Schemas.GET_TABLE_TYPES_SCHEMA, 
sqlClient.getTableTypes(options),
-        sqlClient);
+    validateSchema(FlightSqlProducer.Schemas.GET_TABLES_SCHEMA,
+        sqlClient.getTablesSchema(/*includeSchema*/true, options));
+    validateSchema(FlightSqlProducer.Schemas.GET_TABLES_SCHEMA_NO_SCHEMA,
+        sqlClient.getTablesSchema(/*includeSchema*/false, options));
+
+    validate(FlightSqlProducer.Schemas.GET_TABLE_TYPES_SCHEMA, 
sqlClient.getTableTypes(options), sqlClient);
+    validateSchema(FlightSqlProducer.Schemas.GET_TABLE_TYPES_SCHEMA, 
sqlClient.getTableTypesSchema(options));
+
     validate(FlightSqlProducer.Schemas.GET_PRIMARY_KEYS_SCHEMA,
         sqlClient.getPrimaryKeys(TableRef.of("catalog", "db_schema", "table"), 
options),
         sqlClient);
+    validateSchema(FlightSqlProducer.Schemas.GET_PRIMARY_KEYS_SCHEMA, 
sqlClient.getPrimaryKeysSchema(options));
+
     validate(FlightSqlProducer.Schemas.GET_EXPORTED_KEYS_SCHEMA,
         sqlClient.getExportedKeys(TableRef.of("catalog", "db_schema", 
"table"), options),
         sqlClient);
+    validateSchema(FlightSqlProducer.Schemas.GET_EXPORTED_KEYS_SCHEMA, 
sqlClient.getExportedKeysSchema(options));
+
     validate(FlightSqlProducer.Schemas.GET_IMPORTED_KEYS_SCHEMA,
         sqlClient.getImportedKeys(TableRef.of("catalog", "db_schema", 
"table"), options),
         sqlClient);
+    validateSchema(FlightSqlProducer.Schemas.GET_IMPORTED_KEYS_SCHEMA, 
sqlClient.getImportedKeysSchema(options));
+
     validate(FlightSqlProducer.Schemas.GET_CROSS_REFERENCE_SCHEMA,
         sqlClient.getCrossReference(TableRef.of("pk_catalog", "pk_db_schema", 
"pk_table"),
             TableRef.of("fk_catalog", "fk_db_schema", "fk_table"), options),
         sqlClient);
-    validate(FlightSqlProducer.Schemas.GET_TYPE_INFO_SCHEMA,
-        sqlClient.getXdbcTypeInfo(options), sqlClient);
+    validateSchema(FlightSqlProducer.Schemas.GET_CROSS_REFERENCE_SCHEMA, 
sqlClient.getCrossReferenceSchema(options));
+
+    validate(FlightSqlProducer.Schemas.GET_TYPE_INFO_SCHEMA, 
sqlClient.getXdbcTypeInfo(options), sqlClient);
+    validateSchema(FlightSqlProducer.Schemas.GET_TYPE_INFO_SCHEMA, 
sqlClient.getXdbcTypeInfoSchema(options));
+
     validate(FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA,
         sqlClient.getSqlInfo(new FlightSql.SqlInfo[] 
{FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME,
             FlightSql.SqlInfo.FLIGHT_SQL_SERVER_READ_ONLY}, options), 
sqlClient);
+    validateSchema(FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA, 
sqlClient.getSqlInfoSchema(options));
   }
 
   private void validateStatementExecution(FlightSqlClient sqlClient) throws 
Exception {
@@ -105,6 +126,8 @@ public class FlightSqlScenario implements Scenario {
 
     validate(FlightSqlScenarioProducer.getQuerySchema(),
         sqlClient.execute("SELECT STATEMENT", options), sqlClient);
+    validateSchema(FlightSqlScenarioProducer.getQuerySchema(),
+        sqlClient.getExecuteSchema("SELECT STATEMENT", options));
 
     IntegrationAssertions.assertEquals(sqlClient.executeUpdate("UPDATE 
STATEMENT", options),
         UPDATE_STATEMENT_EXPECTED_ROWS);
@@ -122,6 +145,7 @@ public class FlightSqlScenario implements Scenario {
 
       validate(FlightSqlScenarioProducer.getQuerySchema(), 
preparedStatement.execute(options),
           sqlClient);
+      validateSchema(FlightSqlScenarioProducer.getQuerySchema(), 
preparedStatement.fetchSchema());
     }
 
     try (FlightSqlClient.PreparedStatement preparedStatement = 
sqlClient.prepare(
@@ -139,4 +163,8 @@ public class FlightSqlScenario implements Scenario {
       IntegrationAssertions.assertEquals(expectedSchema, actualSchema);
     }
   }
+
+  private void validateSchema(Schema expected, SchemaResult actual) {
+    IntegrationAssertions.assertEquals(expected, actual.getSchema());
+  }
 }
diff --git 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenarioProducer.java
 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenarioProducer.java
index 7db99187c4..33d62b650e 100644
--- 
a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenarioProducer.java
+++ 
b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenarioProducer.java
@@ -125,9 +125,18 @@ public class FlightSqlScenarioProducer implements 
FlightSqlProducer {
     return getFlightInfoForSchema(command, descriptor, getQuerySchema());
   }
 
+  @Override
+  public SchemaResult 
getSchemaPreparedStatement(FlightSql.CommandPreparedStatementQuery command, 
CallContext context,
+                                                 FlightDescriptor descriptor) {
+    
IntegrationAssertions.assertEquals(command.getPreparedStatementHandle().toStringUtf8(),
+        "SELECT PREPARED STATEMENT HANDLE");
+    return new SchemaResult(getQuerySchema());
+  }
+
   @Override
   public SchemaResult getSchemaStatement(FlightSql.CommandStatementQuery 
command,
                                          CallContext context, FlightDescriptor 
descriptor) {
+    IntegrationAssertions.assertEquals(command.getQuery(), "SELECT STATEMENT");
     return new SchemaResult(getQuerySchema());
   }
 
diff --git 
a/java/flight/flight-integration-tests/src/main/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
 
b/java/flight/flight-integration-tests/src/main/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
new file mode 100644
index 0000000000..dfb9a81085
--- /dev/null
+++ 
b/java/flight/flight-integration-tests/src/main/test/java/org/apache/arrow/flight/integration/tests/IntegrationTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.integration.tests;
+
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Run the integration test scenarios in-process.
+ */
+class IntegrationTest {
+  @Test
+  void authBasicProto() throws Exception {
+    testScenario("auth:basic_proto");
+  }
+
+  @Test
+  void middleware() throws Exception {
+    testScenario("middleware");
+  }
+
+  @Test
+  void flightSql() throws Exception {
+    testScenario("flight_sql");
+  }
+
+  void testScenario(String scenarioName) throws Exception {
+    try (final BufferAllocator allocator = new RootAllocator()) {
+      final FlightServer.Builder builder = FlightServer.builder()
+          .allocator(allocator)
+          .location(Location.forGrpcInsecure("0.0.0.0", 0));
+      final Scenario scenario = Scenarios.getScenario(scenarioName);
+      scenario.buildServer(builder);
+      builder.producer(scenario.producer(allocator, 
Location.forGrpcInsecure("0.0.0.0", 0)));
+
+      try (final FlightServer server = builder.build()) {
+        server.start();
+
+        final Location location = Location.forGrpcInsecure("localhost", 
server.getPort());
+        try (final FlightClient client = FlightClient.builder(allocator, 
location).build()) {
+          scenario.client(allocator, location, client);
+        }
+      }
+    }
+  }
+}
diff --git 
a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java
 
b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java
index dd9480f400..f1f07a1588 100644
--- 
a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java
+++ 
b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java
@@ -97,6 +97,16 @@ public class FlightSqlClient implements AutoCloseable {
     return client.getInfo(descriptor, options);
   }
 
+  /**
+   * Get the schema of the result set of a query.
+   */
+  public SchemaResult getExecuteSchema(final String query, final CallOption... 
options) {
+    final CommandStatementQuery.Builder builder = 
CommandStatementQuery.newBuilder();
+    builder.setQuery(query);
+    final FlightDescriptor descriptor = 
FlightDescriptor.command(Any.pack(builder.build()).toByteArray());
+    return client.getSchema(descriptor, options);
+  }
+
   /**
    * Execute an update query on the server.
    *
@@ -137,6 +147,17 @@ public class FlightSqlClient implements AutoCloseable {
     return client.getInfo(descriptor, options);
   }
 
+  /**
+   * Get the schema of {@link #getCatalogs(CallOption...)} from the server.
+   *
+   * <p>Should be identical to {@link 
FlightSqlProducer.Schemas#GET_CATALOGS_SCHEMA}.
+   */
+  public SchemaResult getCatalogsSchema(final CallOption... options) {
+    final CommandGetCatalogs command = CommandGetCatalogs.getDefaultInstance();
+    final FlightDescriptor descriptor = 
FlightDescriptor.command(Any.pack(command).toByteArray());
+    return client.getSchema(descriptor, options);
+  }
+
   /**
    * Request a list of schemas.
    *
@@ -160,6 +181,17 @@ public class FlightSqlClient implements AutoCloseable {
     return client.getInfo(descriptor, options);
   }
 
+  /**
+   * Get the schema of {@link #getSchemas(String, String, CallOption...)} from 
the server.
+   *
+   * <p>Should be identical to {@link 
FlightSqlProducer.Schemas#GET_SCHEMAS_SCHEMA}.
+   */
+  public SchemaResult getSchemasSchema(final CallOption... options) {
+    final CommandGetDbSchemas command = 
CommandGetDbSchemas.getDefaultInstance();
+    final FlightDescriptor descriptor = 
FlightDescriptor.command(Any.pack(command).toByteArray());
+    return client.getSchema(descriptor, options);
+  }
+
   /**
    * Get schema for a stream.
    *
@@ -231,6 +263,17 @@ public class FlightSqlClient implements AutoCloseable {
     return client.getInfo(descriptor, options);
   }
 
+  /**
+   * Get the schema of {@link #getSqlInfo(SqlInfo...)} from the server.
+   *
+   * <p>Should be identical to {@link 
FlightSqlProducer.Schemas#GET_SQL_INFO_SCHEMA}.
+   */
+  public SchemaResult getSqlInfoSchema(final CallOption... options) {
+    final CommandGetSqlInfo command = CommandGetSqlInfo.getDefaultInstance();
+    final FlightDescriptor descriptor = 
FlightDescriptor.command(Any.pack(command).toByteArray());
+    return client.getSchema(descriptor, options);
+  }
+
   /**
    * Request the information about the data types supported related to
    * a filter data type.
@@ -261,6 +304,17 @@ public class FlightSqlClient implements AutoCloseable {
     return client.getInfo(descriptor, options);
   }
 
+  /**
+   * Get the schema of {@link #getXdbcTypeInfo(CallOption...)} from the server.
+   *
+   * <p>Should be identical to {@link 
FlightSqlProducer.Schemas#GET_TYPE_INFO_SCHEMA}.
+   */
+  public SchemaResult getXdbcTypeInfoSchema(final CallOption... options) {
+    final CommandGetXdbcTypeInfo command = 
CommandGetXdbcTypeInfo.getDefaultInstance();
+    final FlightDescriptor descriptor = 
FlightDescriptor.command(Any.pack(command).toByteArray());
+    return client.getSchema(descriptor, options);
+  }
+
   /**
    * Request a list of tables.
    *
@@ -298,6 +352,18 @@ public class FlightSqlClient implements AutoCloseable {
     return client.getInfo(descriptor, options);
   }
 
+  /**
+   * Get the schema of {@link #getTables(String, String, String, List, 
boolean, CallOption...)} from the server.
+   *
+   * <p>Should be identical to {@link 
FlightSqlProducer.Schemas#GET_TABLES_SCHEMA} or
+   * {@link FlightSqlProducer.Schemas#GET_TABLES_SCHEMA_NO_SCHEMA}.
+   */
+  public SchemaResult getTablesSchema(boolean includeSchema, final 
CallOption... options) {
+    final CommandGetTables command = 
CommandGetTables.newBuilder().setIncludeSchema(includeSchema).build();
+    final FlightDescriptor descriptor = 
FlightDescriptor.command(Any.pack(command).toByteArray());
+    return client.getSchema(descriptor, options);
+  }
+
   /**
    * Request the primary keys for a table.
    *
@@ -323,6 +389,17 @@ public class FlightSqlClient implements AutoCloseable {
     return client.getInfo(descriptor, options);
   }
 
+  /**
+   * Get the schema of {@link #getPrimaryKeys(TableRef, CallOption...)} from 
the server.
+   *
+   * <p>Should be identical to {@link 
FlightSqlProducer.Schemas#GET_PRIMARY_KEYS_SCHEMA}.
+   */
+  public SchemaResult getPrimaryKeysSchema(final CallOption... options) {
+    final CommandGetPrimaryKeys command = 
CommandGetPrimaryKeys.getDefaultInstance();
+    final FlightDescriptor descriptor = 
FlightDescriptor.command(Any.pack(command).toByteArray());
+    return client.getSchema(descriptor, options);
+  }
+
   /**
    * Retrieves a description about the foreign key columns that reference the 
primary key columns of the given table.
    *
@@ -350,6 +427,17 @@ public class FlightSqlClient implements AutoCloseable {
     return client.getInfo(descriptor, options);
   }
 
+  /**
+   * Get the schema of {@link #getExportedKeys(TableRef, CallOption...)} from 
the server.
+   *
+   * <p>Should be identical to {@link 
FlightSqlProducer.Schemas#GET_EXPORTED_KEYS_SCHEMA}.
+   */
+  public SchemaResult getExportedKeysSchema(final CallOption... options) {
+    final CommandGetExportedKeys command = 
CommandGetExportedKeys.getDefaultInstance();
+    final FlightDescriptor descriptor = 
FlightDescriptor.command(Any.pack(command).toByteArray());
+    return client.getSchema(descriptor, options);
+  }
+
   /**
    * Retrieves the foreign key columns for the given table.
    *
@@ -378,6 +466,17 @@ public class FlightSqlClient implements AutoCloseable {
     return client.getInfo(descriptor, options);
   }
 
+  /**
+   * Get the schema of {@link #getImportedKeys(TableRef, CallOption...)} from 
the server.
+   *
+   * <p>Should be identical to {@link 
FlightSqlProducer.Schemas#GET_IMPORTED_KEYS_SCHEMA}.
+   */
+  public SchemaResult getImportedKeysSchema(final CallOption... options) {
+    final CommandGetImportedKeys command = 
CommandGetImportedKeys.getDefaultInstance();
+    final FlightDescriptor descriptor = 
FlightDescriptor.command(Any.pack(command).toByteArray());
+    return client.getSchema(descriptor, options);
+  }
+
   /**
    * Retrieves a description of the foreign key columns that reference the 
given table's
    * primary key columns (the foreign keys exported by a table).
@@ -417,6 +516,17 @@ public class FlightSqlClient implements AutoCloseable {
     return client.getInfo(descriptor, options);
   }
 
+  /**
+   * Get the schema of {@link #getCrossReference(TableRef, TableRef, 
CallOption...)} from the server.
+   *
+   * <p>Should be identical to {@link 
FlightSqlProducer.Schemas#GET_CROSS_REFERENCE_SCHEMA}.
+   */
+  public SchemaResult getCrossReferenceSchema(final CallOption... options) {
+    final CommandGetCrossReference command = 
CommandGetCrossReference.getDefaultInstance();
+    final FlightDescriptor descriptor = 
FlightDescriptor.command(Any.pack(command).toByteArray());
+    return client.getSchema(descriptor, options);
+  }
+
   /**
    * Request a list of table types.
    *
@@ -429,6 +539,17 @@ public class FlightSqlClient implements AutoCloseable {
     return client.getInfo(descriptor, options);
   }
 
+  /**
+   * Get the schema of {@link #getTableTypes(CallOption...)} from the server.
+   *
+   * <p>Should be identical to {@link 
FlightSqlProducer.Schemas#GET_TABLE_TYPES_SCHEMA}.
+   */
+  public SchemaResult getTableTypesSchema(final CallOption... options) {
+    final CommandGetTableTypes command = 
CommandGetTableTypes.getDefaultInstance();
+    final FlightDescriptor descriptor = 
FlightDescriptor.command(Any.pack(command).toByteArray());
+    return client.getSchema(descriptor, options);
+  }
+
   /**
    * Create a prepared statement on the server.
    *
@@ -534,6 +655,20 @@ public class FlightSqlClient implements AutoCloseable {
       return parameterSchema;
     }
 
+    /**
+     * Get the schema of the result set (should be identical to {@link 
#getResultSetSchema()}).
+     */
+    public SchemaResult fetchSchema(CallOption... options) {
+      checkOpen();
+
+      final FlightDescriptor descriptor = FlightDescriptor
+          .command(Any.pack(CommandPreparedStatementQuery.newBuilder()
+                  
.setPreparedStatementHandle(preparedStatementResult.getPreparedStatementHandle())
+                  .build())
+              .toByteArray());
+      return client.getSchema(descriptor, options);
+    }
+
     private Schema deserializeSchema(final ByteString bytes) {
       try {
         return bytes.isEmpty() ?
diff --git 
a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java
 
b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java
index c617c6a03e..4226ec9e22 100644
--- 
a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java
+++ 
b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java
@@ -147,26 +147,32 @@ public interface FlightSqlProducer extends 
FlightProducer, AutoCloseable {
     if (command.is(CommandStatementQuery.class)) {
       return getSchemaStatement(
           FlightSqlUtils.unpackOrThrow(command, CommandStatementQuery.class), 
context, descriptor);
+    } else if (command.is(CommandPreparedStatementQuery.class)) {
+      return getSchemaPreparedStatement(
+          FlightSqlUtils.unpackOrThrow(command, 
CommandPreparedStatementQuery.class), context, descriptor);
     } else if (command.is(CommandGetCatalogs.class)) {
       return new SchemaResult(Schemas.GET_CATALOGS_SCHEMA);
+    } else if (command.is(CommandGetCrossReference.class)) {
+      return new SchemaResult(Schemas.GET_CROSS_REFERENCE_SCHEMA);
     } else if (command.is(CommandGetDbSchemas.class)) {
       return new SchemaResult(Schemas.GET_SCHEMAS_SCHEMA);
+    } else if (command.is(CommandGetExportedKeys.class)) {
+      return new SchemaResult(Schemas.GET_EXPORTED_KEYS_SCHEMA);
+    } else if (command.is(CommandGetImportedKeys.class)) {
+      return new SchemaResult(Schemas.GET_IMPORTED_KEYS_SCHEMA);
+    } else if (command.is(CommandGetPrimaryKeys.class)) {
+      return new SchemaResult(Schemas.GET_PRIMARY_KEYS_SCHEMA);
     } else if (command.is(CommandGetTables.class)) {
-      return new SchemaResult(Schemas.GET_TABLES_SCHEMA);
+      if (FlightSqlUtils.unpackOrThrow(command, 
CommandGetTables.class).getIncludeSchema()) {
+        return new SchemaResult(Schemas.GET_TABLES_SCHEMA);
+      }
+      return new SchemaResult(Schemas.GET_TABLES_SCHEMA_NO_SCHEMA);
     } else if (command.is(CommandGetTableTypes.class)) {
       return new SchemaResult(Schemas.GET_TABLE_TYPES_SCHEMA);
     } else if (command.is(CommandGetSqlInfo.class)) {
       return new SchemaResult(Schemas.GET_SQL_INFO_SCHEMA);
     } else if (command.is(CommandGetXdbcTypeInfo.class)) {
       return new SchemaResult(Schemas.GET_TYPE_INFO_SCHEMA);
-    } else if (command.is(CommandGetPrimaryKeys.class)) {
-      return new SchemaResult(Schemas.GET_PRIMARY_KEYS_SCHEMA);
-    } else if (command.is(CommandGetImportedKeys.class)) {
-      return new SchemaResult(Schemas.GET_IMPORTED_KEYS_SCHEMA);
-    } else if (command.is(CommandGetExportedKeys.class)) {
-      return new SchemaResult(Schemas.GET_EXPORTED_KEYS_SCHEMA);
-    } else if (command.is(CommandGetCrossReference.class)) {
-      return new SchemaResult(Schemas.GET_CROSS_REFERENCE_SCHEMA);
     }
 
     throw CallStatus.INVALID_ARGUMENT.withDescription("Invalid command 
provided.").toRuntimeException();
@@ -336,16 +342,31 @@ public interface FlightSqlProducer extends 
FlightProducer, AutoCloseable {
                                             CallContext context, 
FlightDescriptor descriptor);
 
   /**
-   * Gets schema about a particular SQL query based data stream.
+   * Get the schema of the result set of a query.
    *
-   * @param command    The sql command to generate the data stream.
+   * @param command    The SQL query.
    * @param context    Per-call context.
    * @param descriptor The descriptor identifying the data stream.
-   * @return Schema for the stream.
+   * @return the schema of the result set.
    */
   SchemaResult getSchemaStatement(CommandStatementQuery command, CallContext 
context,
                                   FlightDescriptor descriptor);
 
+  /**
+   * Get the schema of the result set of a prepared statement.
+   *
+   * @param command    The prepared statement handle.
+   * @param context    Per-call context.
+   * @param descriptor The descriptor identifying the data stream.
+   * @return the schema of the result set.
+   */
+  default SchemaResult 
getSchemaPreparedStatement(CommandPreparedStatementQuery command, CallContext 
context,
+                                  FlightDescriptor descriptor) {
+    throw CallStatus.UNIMPLEMENTED
+        .withDescription("GetSchema with CommandPreparedStatementQuery is not 
implemented")
+        .toRuntimeException();
+  }
+
   /**
    * Returns data for a SQL query based data stream.
    * @param ticket   Ticket message containing the statement handle.
diff --git 
a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlUtils.java
 
b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlUtils.java
index 25affa8f08..e461515c40 100644
--- 
a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlUtils.java
+++ 
b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlUtils.java
@@ -76,7 +76,7 @@ public final class FlightSqlUtils {
       return source.unpack(as);
     } catch (final InvalidProtocolBufferException e) {
       throw CallStatus.INVALID_ARGUMENT
-          .withDescription("Provided message cannot be unpacked as desired 
type.")
+          .withDescription("Provided message cannot be unpacked as " + 
as.getName() + ": " + e)
           .withCause(e)
           .toRuntimeException();
     }

Reply via email to