lidavidm commented on code in PR #38385:
URL: https://github.com/apache/arrow/pull/38385#discussion_r1382045726
##########
cpp/src/arrow/flight/sql/client.cc:
##########
@@ -256,6 +256,88 @@ arrow::Result<int64_t>
FlightSqlClient::ExecuteSubstraitUpdate(
return update_result.record_count();
}
+arrow::Result<int64_t> FlightSqlClient::ExecuteIngest(
+ const FlightCallOptions& options,
+ const std::shared_ptr<RecordBatchReader>& reader,
+ const IngestMode& mode,
+ const std::string& table, const std::string& schema,
+ const std::string& catalog, const bool temporary,
+ const Transaction& transaction,
+ const std::map<std::string, std::string>& ingest_options) {
+ flight_sql_pb::CommandStatementIngest command;
+
+ flight_sql_pb::CommandStatementIngest_IngestMode pb_ingest_mode;
+ switch (mode)
+ {
Review Comment:
Make sure to run the linter/formatter
##########
cpp/src/arrow/flight/integration_tests/test_integration.cc:
##########
@@ -1928,6 +1929,122 @@ class FlightSqlExtensionScenario : public
FlightSqlScenario {
return Status::OK();
}
};
+
+std::shared_ptr<Schema> getIngestSchema () {
Review Comment:
```suggestion
std::shared_ptr<Schema> GetIngestSchema() {
```
The style guide uses UpperCamelCase for naming
##########
cpp/src/arrow/flight/sql/client.cc:
##########
@@ -256,6 +256,88 @@ arrow::Result<int64_t>
FlightSqlClient::ExecuteSubstraitUpdate(
return update_result.record_count();
}
+arrow::Result<int64_t> FlightSqlClient::ExecuteIngest(
+ const FlightCallOptions& options,
+ const std::shared_ptr<RecordBatchReader>& reader,
+ const IngestMode& mode,
+ const std::string& table, const std::string& schema,
+ const std::string& catalog, const bool temporary,
+ const Transaction& transaction,
+ const std::map<std::string, std::string>& ingest_options) {
+ flight_sql_pb::CommandStatementIngest command;
+
+ flight_sql_pb::CommandStatementIngest_IngestMode pb_ingest_mode;
+ switch (mode)
+ {
+ case IngestMode::Unspecified:
+ pb_ingest_mode =
flight_sql_pb::CommandStatementIngest_IngestMode_INGEST_MODE_UNSPECIFIED;
+ break;
+ case IngestMode::Append:
+ pb_ingest_mode =
flight_sql_pb::CommandStatementIngest_IngestMode_INGEST_MODE_APPEND;
+ break;
+ case IngestMode::Create:
+ pb_ingest_mode =
flight_sql_pb::CommandStatementIngest_IngestMode_INGEST_MODE_CREATE;
+ break;
+ case IngestMode::CreateAppend:
+ pb_ingest_mode =
flight_sql_pb::CommandStatementIngest_IngestMode_INGEST_MODE_CREATE_APPEND;
+ break;
+ case IngestMode::Replace:
+ pb_ingest_mode =
flight_sql_pb::CommandStatementIngest_IngestMode_INGEST_MODE_REPLACE;
+ break;
+
+ default:
+ break;
+ }
+ command.set_mode(pb_ingest_mode);
+ command.set_table(table);
+
+ if (!schema.empty()) {
+ command.set_schema(schema);
+ }
+
+ if (!catalog.empty()) {
+ command.set_catalog(catalog);
+ }
+
+ command.set_temporary(temporary);
+
+ if (transaction.is_valid()) {
+ command.set_transaction_id(transaction.transaction_id());
+ }
+
+ auto command_options = command.mutable_options();
+ for (auto &[key, val] : ingest_options) {
Review Comment:
```suggestion
for (const auto& [key, val] : ingest_options) {
```
nit
##########
cpp/src/arrow/flight/sql/client.h:
##########
@@ -101,6 +110,26 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
const FlightCallOptions& options, const SubstraitPlan& plan,
const Transaction& transaction = no_transaction());
+ /// \brief Execute a bulk ingestion to the server.
+ /// \param[in] options RPC-layer hints for this call.
+ /// \param[in] reader The records to ingest.
+ /// \param[in] mode The ingestion behavior.
+ /// \param[in] table The destination table to load into.
+ /// \param[in] schema The DB schema of the destination table.
+ /// \param[in] catalog The catalog of the destination table.
+ /// \param[in] temporary Use a temporary table.
+ /// \param[in] transaction Ingest as part of this transaction.
+ /// \param[in] ingest_options Additional, backend-specific options.
+ /// \return The number of rows ingested to the server.
+ arrow::Result<int64_t> ExecuteIngest(
+ const FlightCallOptions& options,
+ const std::shared_ptr<RecordBatchReader>& reader,
+ const IngestMode& mode,
+ const std::string& table, const std::string& schema = "",
Review Comment:
Let's use `std::optional<std::string>` for optional arguments
##########
cpp/src/arrow/flight/sql/server.h:
##########
@@ -85,6 +85,32 @@ struct ARROW_FLIGHT_SQL_EXPORT PreparedStatementUpdate {
std::string prepared_statement_handle;
};
+/// \brief A bulk ingestion request
+struct ARROW_FLIGHT_SQL_EXPORT StatementIngest {
+ enum IngestMode {
+ kUnspecified,
+ kCreate,
+ kAppend,
+ kReplace,
+ kCreateAppend,
+ };
+
+ /// \brief The ingestion behavior.
+ IngestMode mode;
+ /// \brief The destination table to load into.
+ std::string table;
+ /// \brief The DB schema of the destination table.
+ std::string schema;
Review Comment:
`std::optional<std::string>` for optionals
##########
cpp/src/arrow/flight/sql/server.h:
##########
@@ -85,6 +85,32 @@ struct ARROW_FLIGHT_SQL_EXPORT PreparedStatementUpdate {
std::string prepared_statement_handle;
};
+/// \brief A bulk ingestion request
+struct ARROW_FLIGHT_SQL_EXPORT StatementIngest {
+ enum IngestMode {
+ kUnspecified,
+ kCreate,
+ kAppend,
+ kReplace,
+ kCreateAppend,
+ };
+
+ /// \brief The ingestion behavior.
+ IngestMode mode;
+ /// \brief The destination table to load into.
+ std::string table;
+ /// \brief The DB schema of the destination table.
+ std::string schema;
Review Comment:
Protobuf should expose `has_schema` etc. so you can check if it was actually
absent vs empty string
##########
cpp/src/arrow/flight/sql/client.h:
##########
@@ -36,6 +36,15 @@ class PreparedStatement;
class Transaction;
class Savepoint;
+/// \brief Options for ingestion behavior.
+enum ARROW_FLIGHT_SQL_EXPORT IngestMode {
Review Comment:
```suggestion
enum class IngestMode {
```
##########
format/FlightSql.proto:
##########
@@ -149,6 +149,17 @@ enum SqlInfo {
*/
FLIGHT_SQL_SERVER_TRANSACTION_TIMEOUT = 101;
+ /*
+ * Retrieves a boolean value indicating whether transactions are supported
for bulk ingestion. If not, invoking
+ * the method commit in the context of a bulk ingestion is a noop, and the
isolation level is
+ * `arrow.flight.protocol.sql.SqlTransactionIsolationLevel.TRANSACTION_NONE`.
+ *
+ * Returns:
+ * - false: if bulk ingestion transactions are unsupported;
+ * - true: if bulk ingestion transactions are supported.
+ */
+ FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED = 102;
Review Comment:
Sorry, I should have mentioned: we should also add a value for the ingestion
feature itself in general
##########
cpp/src/arrow/flight/sql/client.h:
##########
@@ -36,6 +36,15 @@ class PreparedStatement;
class Transaction;
class Savepoint;
+/// \brief Options for ingestion behavior.
+enum ARROW_FLIGHT_SQL_EXPORT IngestMode {
+ Unspecified,
Review Comment:
```suggestion
kUnspecified,
```
to follow style guide
##########
cpp/src/arrow/flight/sql/server.h:
##########
@@ -85,6 +85,32 @@ struct ARROW_FLIGHT_SQL_EXPORT PreparedStatementUpdate {
std::string prepared_statement_handle;
};
+/// \brief A bulk ingestion request
+struct ARROW_FLIGHT_SQL_EXPORT StatementIngest {
+ enum IngestMode {
Review Comment:
Move the enum to types.h and just have one definition for client/server?
##########
cpp/src/arrow/flight/sql/client.h:
##########
@@ -101,6 +110,26 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
const FlightCallOptions& options, const SubstraitPlan& plan,
const Transaction& transaction = no_transaction());
+ /// \brief Execute a bulk ingestion to the server.
+ /// \param[in] options RPC-layer hints for this call.
+ /// \param[in] reader The records to ingest.
+ /// \param[in] mode The ingestion behavior.
+ /// \param[in] table The destination table to load into.
+ /// \param[in] schema The DB schema of the destination table.
+ /// \param[in] catalog The catalog of the destination table.
+ /// \param[in] temporary Use a temporary table.
+ /// \param[in] transaction Ingest as part of this transaction.
+ /// \param[in] ingest_options Additional, backend-specific options.
+ /// \return The number of rows ingested to the server.
+ arrow::Result<int64_t> ExecuteIngest(
+ const FlightCallOptions& options,
+ const std::shared_ptr<RecordBatchReader>& reader,
+ const IngestMode& mode,
+ const std::string& table, const std::string& schema = "",
+ const std::string& catalog = "", const bool temporary = false,
+ const Transaction& transaction = no_transaction(),
+ const std::map<std::string, std::string>& ingest_options = {});
Review Comment:
nit: prefer unordered_map
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]