This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 34b364f4 feat(c/driver): support ingesting into temporary tables
(#1057)
34b364f4 is described below
commit 34b364f488aa813bea52e83bedb286dc29d9585d
Author: David Li <[email protected]>
AuthorDate: Tue Sep 12 16:31:19 2023 -0400
feat(c/driver): support ingesting into temporary tables (#1057)
- If not specified, drivers will explicitly favor non-temporary tables.
Fixes https://github.com/apache/arrow-adbc/issues/1018.
---
c/driver/common/options.h | 11 +
c/driver/postgresql/CMakeLists.txt | 1 +
c/driver/postgresql/connection.cc | 143 +------
c/driver/postgresql/error.cc | 3 +
c/driver/postgresql/postgresql_test.cc | 95 +++--
c/driver/postgresql/result_helper.cc | 68 ++++
c/driver/postgresql/result_helper.h | 133 +++++++
c/driver/postgresql/statement.cc | 63 ++-
c/driver/postgresql/statement.h | 3 +-
c/driver/sqlite/sqlite.c | 35 +-
c/driver/sqlite/sqlite_test.cc | 29 +-
c/driver/sqlite/types.h | 1 +
c/validation/adbc_validation.cc | 424 +++++++++++++++++++++
c/validation/adbc_validation.h | 18 +
.../adbc_driver_manager/__init__.py | 3 +
.../adbc_driver_manager/dbapi.py | 26 +-
python/adbc_driver_postgresql/tests/test_dbapi.py | 74 +++-
r/adbcpostgresql/bootstrap.R | 2 +
r/adbcpostgresql/src/Makevars.in | 1 +
r/adbcpostgresql/src/Makevars.ucrt | 1 +
r/adbcpostgresql/src/Makevars.win | 1 +
21 files changed, 942 insertions(+), 193 deletions(-)
diff --git a/c/driver/common/options.h b/c/driver/common/options.h
index f8b64efa..f42bb090 100644
--- a/c/driver/common/options.h
+++ b/c/driver/common/options.h
@@ -34,6 +34,17 @@ extern "C" {
/// The type is char*.
#define ADBC_INGEST_OPTION_TARGET_DB_SCHEMA "adbc.ingest.target_db_schema"
+/// \brief Use a temporary table for ingestion.
+///
+/// The value should be ADBC_OPTION_VALUE_ENABLED or
+/// ADBC_OPTION_VALUE_DISABLED (the default).
+///
+/// This is not supported with ADBC_INGEST_OPTION_TARGET_CATALOG and
+/// ADBC_INGEST_OPTION_TARGET_DB_SCHEMA.
+///
+/// The type is char*.
+#define ADBC_INGEST_OPTION_TEMPORARY "adbc.ingest.temporary"
+
#ifdef __cplusplus
}
#endif
diff --git a/c/driver/postgresql/CMakeLists.txt
b/c/driver/postgresql/CMakeLists.txt
index 9cf595a3..6deae291 100644
--- a/c/driver/postgresql/CMakeLists.txt
+++ b/c/driver/postgresql/CMakeLists.txt
@@ -32,6 +32,7 @@ add_arrow_lib(adbc_driver_postgresql
error.cc
database.cc
postgresql.cc
+ result_helper.cc
statement.cc
OUTPUTS
ADBC_LIBRARIES
diff --git a/c/driver/postgresql/connection.cc
b/c/driver/postgresql/connection.cc
index 35b13115..1e2f5c51 100644
--- a/c/driver/postgresql/connection.cc
+++ b/c/driver/postgresql/connection.cc
@@ -34,6 +34,7 @@
#include "common/utils.h"
#include "database.h"
#include "error.h"
+#include "result_helper.h"
namespace adbcpq {
namespace {
@@ -48,148 +49,6 @@ static const std::unordered_map<std::string, std::string>
kPgTableTypes = {
{"table", "r"}, {"view", "v"}, {"materialized_view", "m"},
{"toast_table", "t"}, {"foreign_table", "f"}, {"partitioned_table", "p"}};
-/// \brief A single column in a single row of a result set.
-struct PqRecord {
- const char* data;
- const int len;
- const bool is_null;
-
- // XXX: can't use optional due to R
- std::pair<bool, double> ParseDouble() const {
- char* end;
- double result = std::strtod(data, &end);
- if (errno != 0 || end == data) {
- return std::make_pair(false, 0.0);
- }
- return std::make_pair(true, result);
- }
-};
-
-// Used by PqResultHelper to provide index-based access to the records within
each
-// row of a PGresult
-class PqResultRow {
- public:
- PqResultRow(PGresult* result, int row_num) : result_(result),
row_num_(row_num) {
- ncols_ = PQnfields(result);
- }
-
- PqRecord operator[](const int& col_num) {
- assert(col_num < ncols_);
- const char* data = PQgetvalue(result_, row_num_, col_num);
- const int len = PQgetlength(result_, row_num_, col_num);
- const bool is_null = PQgetisnull(result_, row_num_, col_num);
-
- return PqRecord{data, len, is_null};
- }
-
- private:
- PGresult* result_ = nullptr;
- int row_num_;
- int ncols_;
-};
-
-// Helper to manager the lifecycle of a PQResult. The query argument
-// will be evaluated as part of the constructor, with the desctructor handling
cleanup
-// Caller must call Prepare then Execute, checking both for an OK
AdbcStatusCode
-// prior to iterating
-class PqResultHelper {
- public:
- explicit PqResultHelper(PGconn* conn, std::string query, struct AdbcError*
error)
- : conn_(conn), query_(std::move(query)), error_(error) {}
-
- explicit PqResultHelper(PGconn* conn, std::string query,
- std::vector<std::string> param_values, struct
AdbcError* error)
- : conn_(conn),
- query_(std::move(query)),
- param_values_(param_values),
- error_(error) {}
-
- AdbcStatusCode Prepare() {
- // TODO: make stmtName a unique identifier?
- PGresult* result =
- PQprepare(conn_, /*stmtName=*/"", query_.c_str(),
param_values_.size(), NULL);
- if (PQresultStatus(result) != PGRES_COMMAND_OK) {
- AdbcStatusCode code =
- SetError(error_, result, "[libpq] Failed to prepare query: %s\nQuery
was:%s",
- PQerrorMessage(conn_), query_.c_str());
- PQclear(result);
- return code;
- }
-
- PQclear(result);
- return ADBC_STATUS_OK;
- }
-
- AdbcStatusCode Execute() {
- std::vector<const char*> param_c_strs;
-
- for (size_t index = 0; index < param_values_.size(); index++) {
- param_c_strs.push_back(param_values_[index].c_str());
- }
-
- result_ = PQexecPrepared(conn_, "", param_values_.size(),
param_c_strs.data(), NULL,
- NULL, 0);
-
- ExecStatusType status = PQresultStatus(result_);
- if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK) {
- AdbcStatusCode error =
- SetError(error_, result_, "[libpq] Failed to execute query '%s': %s",
- query_.c_str(), PQerrorMessage(conn_));
- return error;
- }
-
- return ADBC_STATUS_OK;
- }
-
- ~PqResultHelper() {
- if (result_ != nullptr) {
- PQclear(result_);
- }
- }
-
- int NumRows() { return PQntuples(result_); }
-
- int NumColumns() { return PQnfields(result_); }
-
- class iterator {
- const PqResultHelper& outer_;
- int curr_row_ = 0;
-
- public:
- explicit iterator(const PqResultHelper& outer, int curr_row = 0)
- : outer_(outer), curr_row_(curr_row) {}
- iterator& operator++() {
- curr_row_++;
- return *this;
- }
- iterator operator++(int) {
- iterator retval = *this;
- ++(*this);
- return retval;
- }
- bool operator==(iterator other) const {
- return outer_.result_ == other.outer_.result_ && curr_row_ ==
other.curr_row_;
- }
- bool operator!=(iterator other) const { return !(*this == other); }
- PqResultRow operator*() { return PqResultRow(outer_.result_, curr_row_); }
- using iterator_category = std::forward_iterator_tag;
- using difference_type = std::ptrdiff_t;
- using value_type = std::vector<PqResultRow>;
- using pointer = const std::vector<PqResultRow>*;
- using reference = const std::vector<PqResultRow>&;
- };
-
- iterator begin() { return iterator(*this); }
- iterator end() { return iterator(*this, NumRows()); }
-
- private:
- PGresult* result_ = nullptr;
- PGconn* conn_;
- std::string query_;
- std::vector<std::string> param_values_;
- struct AdbcError* error_;
-};
-
class PqGetObjectsHelper {
public:
PqGetObjectsHelper(PGconn* conn, int depth, const char* catalog, const char*
db_schema,
diff --git a/c/driver/postgresql/error.cc b/c/driver/postgresql/error.cc
index 47e04496..ed93d174 100644
--- a/c/driver/postgresql/error.cc
+++ b/c/driver/postgresql/error.cc
@@ -68,6 +68,9 @@ AdbcStatusCode SetError(struct AdbcError* error, PGresult*
result, const char* f
// This can be extended in the future
if (std::strcmp(sqlstate, "57014") == 0) {
code = ADBC_STATUS_CANCELLED;
+ } else if (std::strcmp(sqlstate, "42P01") == 0 ||
+ std::strcmp(sqlstate, "42602") == 0) {
+ code = ADBC_STATUS_NOT_FOUND;
} else if (std::strncmp(sqlstate, "42", 0) == 0) {
// Class 42 — Syntax Error or Access Rule Violation
code = ADBC_STATUS_INVALID_ARGUMENT;
diff --git a/c/driver/postgresql/postgresql_test.cc
b/c/driver/postgresql/postgresql_test.cc
index 4328f8e3..24871e80 100644
--- a/c/driver/postgresql/postgresql_test.cc
+++ b/c/driver/postgresql/postgresql_test.cc
@@ -28,11 +28,13 @@
#include <gtest/gtest.h>
#include <nanoarrow/nanoarrow.h>
+#include "common/options.h"
#include "common/utils.h"
#include "database.h"
#include "validation/adbc_validation.h"
#include "validation/adbc_validation_util.h"
+using adbc_validation::Handle;
using adbc_validation::IsOkStatus;
using adbc_validation::IsStatus;
@@ -50,38 +52,35 @@ class PostgresQuirks : public adbc_validation::DriverQuirks
{
AdbcStatusCode DropTable(struct AdbcConnection* connection, const
std::string& name,
struct AdbcError* error) const override {
- struct AdbcStatement statement;
- std::memset(&statement, 0, sizeof(statement));
- AdbcStatusCode status = AdbcStatementNew(connection, &statement, error);
- if (status != ADBC_STATUS_OK) return status;
+ Handle<struct AdbcStatement> statement;
+ RAISE_ADBC(AdbcStatementNew(connection, &statement.value, error));
std::string query = "DROP TABLE IF EXISTS \"" + name + "\"";
- status = AdbcStatementSetSqlQuery(&statement, query.c_str(), error);
- if (status != ADBC_STATUS_OK) {
- std::ignore = AdbcStatementRelease(&statement, error);
- return status;
- }
- status = AdbcStatementExecuteQuery(&statement, nullptr, nullptr, error);
- std::ignore = AdbcStatementRelease(&statement, error);
- return status;
+ RAISE_ADBC(AdbcStatementSetSqlQuery(&statement.value, query.c_str(),
error));
+ RAISE_ADBC(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
error));
+ return AdbcStatementRelease(&statement.value, error);
+ }
+
+ AdbcStatusCode DropTempTable(struct AdbcConnection* connection, const
std::string& name,
+ struct AdbcError* error) const override {
+ Handle<struct AdbcStatement> statement;
+ RAISE_ADBC(AdbcStatementNew(connection, &statement.value, error));
+
+ std::string query = "DROP TABLE IF EXISTS pg_temp . \"" + name + "\"";
+ RAISE_ADBC(AdbcStatementSetSqlQuery(&statement.value, query.c_str(),
error));
+ RAISE_ADBC(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
error));
+ return AdbcStatementRelease(&statement.value, error);
}
AdbcStatusCode DropView(struct AdbcConnection* connection, const
std::string& name,
struct AdbcError* error) const override {
- struct AdbcStatement statement;
- std::memset(&statement, 0, sizeof(statement));
- AdbcStatusCode status = AdbcStatementNew(connection, &statement, error);
- if (status != ADBC_STATUS_OK) return status;
-
- std::string query = "DROP VIEW IF EXISTS " + name;
- status = AdbcStatementSetSqlQuery(&statement, query.c_str(), error);
- if (status != ADBC_STATUS_OK) {
- std::ignore = AdbcStatementRelease(&statement, error);
- return status;
- }
- status = AdbcStatementExecuteQuery(&statement, nullptr, nullptr, error);
- std::ignore = AdbcStatementRelease(&statement, error);
- return status;
+ Handle<struct AdbcStatement> statement;
+ RAISE_ADBC(AdbcStatementNew(connection, &statement.value, error));
+
+ std::string query = "DROP VIEW IF EXISTS \"" + name + "\"";
+ RAISE_ADBC(AdbcStatementSetSqlQuery(&statement.value, query.c_str(),
error));
+ RAISE_ADBC(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
error));
+ return AdbcStatementRelease(&statement.value, error);
}
std::string BindParameter(int index) const override {
@@ -113,6 +112,7 @@ class PostgresQuirks : public adbc_validation::DriverQuirks
{
bool supports_bulk_ingest_catalog() const override { return false; }
bool supports_bulk_ingest_db_schema() const override { return true; }
+ bool supports_bulk_ingest_temporary() const override { return true; }
bool supports_cancel() const override { return true; }
bool supports_execute_schema() const override { return true; }
std::optional<adbc_validation::SqlInfoValue> supports_get_sql_info(
@@ -581,7 +581,7 @@ TEST_F(PostgresConnectionTest, MetadataSetCurrentDbSchema) {
AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM schematable",
&error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
- IsStatus(ADBC_STATUS_INVALID_ARGUMENT, &error));
+ IsStatus(ADBC_STATUS_NOT_FOUND, &error));
// 42P01 = table not found
ASSERT_EQ("42P01", std::string_view(error.sqlstate, 5));
ASSERT_NE(0, AdbcErrorGetDetailCount(&error));
@@ -881,7 +881,8 @@ class PostgresStatementTest : public ::testing::Test,
ADBCV_TEST_STATEMENT(PostgresStatementTest)
TEST_F(PostgresStatementTest, SqlIngestTemporaryTable) {
- ASSERT_THAT(quirks()->DropTable(&connection, "temptable", &error),
IsOkStatus(&error));
+ ASSERT_THAT(quirks()->DropTempTable(&connection, "temptable", &error),
+ IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionSetOption(&connection,
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
ADBC_OPTION_VALUE_DISABLED, &error),
@@ -895,6 +896,39 @@ TEST_F(PostgresStatementTest, SqlIngestTemporaryTable) {
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
+ ASSERT_THAT(AdbcConnectionCommit(&connection, &error), IsOkStatus(&error));
+
+ {
+ adbc_validation::Handle<struct ArrowSchema> schema;
+ adbc_validation::Handle<struct ArrowArray> batch;
+
+ ArrowSchemaInit(&schema.value);
+ ASSERT_THAT(ArrowSchemaSetTypeStruct(&schema.value, 1),
adbc_validation::IsOkErrno());
+ ASSERT_THAT(ArrowSchemaSetType(schema->children[0], NANOARROW_TYPE_INT64),
+ adbc_validation::IsOkErrno());
+ ASSERT_THAT(ArrowSchemaSetName(schema->children[0], "ints"),
+ adbc_validation::IsOkErrno());
+
+ ASSERT_THAT((adbc_validation::MakeBatch<int64_t>(
+ &schema.value, &batch.value, static_cast<struct
ArrowError*>(nullptr),
+ {-1, 0, 1, std::nullopt})),
+ adbc_validation::IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ "temptable", &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_APPEND, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement, &batch.value, &schema.value,
&error),
+ IsOkStatus(&error));
+ // because temporary table
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr,
&error),
+ IsStatus(ADBC_STATUS_NOT_FOUND, &error));
+ }
+
+ ASSERT_THAT(AdbcConnectionRollback(&connection, &error), IsOkStatus(&error));
+
{
adbc_validation::Handle<struct ArrowSchema> schema;
adbc_validation::Handle<struct ArrowArray> batch;
@@ -919,6 +953,9 @@ TEST_F(PostgresStatementTest, SqlIngestTemporaryTable) {
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &batch.value, &schema.value,
&error),
IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_ENABLED, &error),
+ IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr,
&error),
IsOkStatus(&error));
}
@@ -1145,7 +1182,7 @@ TEST_F(PostgresStatementTest,
AdbcErrorBackwardsCompatibility) {
adbc_validation::StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, error),
- IsStatus(ADBC_STATUS_INVALID_ARGUMENT, error));
+ IsStatus(ADBC_STATUS_NOT_FOUND, error));
ASSERT_EQ("42P01", std::string_view(error->sqlstate, 5));
ASSERT_EQ(0, AdbcErrorGetDetailCount(error));
diff --git a/c/driver/postgresql/result_helper.cc
b/c/driver/postgresql/result_helper.cc
new file mode 100644
index 00000000..3a2a0d01
--- /dev/null
+++ b/c/driver/postgresql/result_helper.cc
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "result_helper.h"
+
+#include "common/utils.h"
+#include "error.h"
+
+namespace adbcpq {
+
+PqResultHelper::~PqResultHelper() {
+ if (result_ != nullptr) {
+ PQclear(result_);
+ }
+}
+
+AdbcStatusCode PqResultHelper::Prepare() {
+ // TODO: make stmtName a unique identifier?
+ PGresult* result =
+ PQprepare(conn_, /*stmtName=*/"", query_.c_str(), param_values_.size(),
NULL);
+ if (PQresultStatus(result) != PGRES_COMMAND_OK) {
+ AdbcStatusCode code =
+ SetError(error_, result, "[libpq] Failed to prepare query: %s\nQuery
was:%s",
+ PQerrorMessage(conn_), query_.c_str());
+ PQclear(result);
+ return code;
+ }
+
+ PQclear(result);
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultHelper::Execute() {
+ std::vector<const char*> param_c_strs;
+
+ for (size_t index = 0; index < param_values_.size(); index++) {
+ param_c_strs.push_back(param_values_[index].c_str());
+ }
+
+ result_ =
+ PQexecPrepared(conn_, "", param_values_.size(), param_c_strs.data(),
NULL, NULL, 0);
+
+ ExecStatusType status = PQresultStatus(result_);
+ if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK) {
+ AdbcStatusCode error =
+ SetError(error_, result_, "[libpq] Failed to execute query '%s': %s",
+ query_.c_str(), PQerrorMessage(conn_));
+ return error;
+ }
+
+ return ADBC_STATUS_OK;
+}
+
+} // namespace adbcpq
diff --git a/c/driver/postgresql/result_helper.h
b/c/driver/postgresql/result_helper.h
new file mode 100644
index 00000000..e9307dcb
--- /dev/null
+++ b/c/driver/postgresql/result_helper.h
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cassert>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <adbc.h>
+#include <libpq-fe.h>
+
+namespace adbcpq {
+
+/// \brief A single column in a single row of a result set.
+struct PqRecord {
+ const char* data;
+ const int len;
+ const bool is_null;
+
+ // XXX: can't use optional due to R
+ std::pair<bool, double> ParseDouble() const {
+ char* end;
+ double result = std::strtod(data, &end);
+ if (errno != 0 || end == data) {
+ return std::make_pair(false, 0.0);
+ }
+ return std::make_pair(true, result);
+ }
+};
+
+// Used by PqResultHelper to provide index-based access to the records within
each
+// row of a PGresult
+class PqResultRow {
+ public:
+ PqResultRow(PGresult* result, int row_num) : result_(result),
row_num_(row_num) {
+ ncols_ = PQnfields(result);
+ }
+
+ PqRecord operator[](const int& col_num) {
+ assert(col_num < ncols_);
+ const char* data = PQgetvalue(result_, row_num_, col_num);
+ const int len = PQgetlength(result_, row_num_, col_num);
+ const bool is_null = PQgetisnull(result_, row_num_, col_num);
+
+ return PqRecord{data, len, is_null};
+ }
+
+ private:
+ PGresult* result_ = nullptr;
+ int row_num_;
+ int ncols_;
+};
+
+// Helper to manager the lifecycle of a PQResult. The query argument
+// will be evaluated as part of the constructor, with the desctructor handling
cleanup
+// Caller must call Prepare then Execute, checking both for an OK
AdbcStatusCode
+// prior to iterating
+class PqResultHelper {
+ public:
+ explicit PqResultHelper(PGconn* conn, std::string query, struct AdbcError*
error)
+ : conn_(conn), query_(std::move(query)), error_(error) {}
+
+ explicit PqResultHelper(PGconn* conn, std::string query,
+ std::vector<std::string> param_values, struct
AdbcError* error)
+ : conn_(conn),
+ query_(std::move(query)),
+ param_values_(std::move(param_values)),
+ error_(error) {}
+
+ ~PqResultHelper();
+
+ AdbcStatusCode Prepare();
+ AdbcStatusCode Execute();
+
+ int NumRows() const { return PQntuples(result_); }
+
+ int NumColumns() const { return PQnfields(result_); }
+
+ class iterator {
+ const PqResultHelper& outer_;
+ int curr_row_ = 0;
+
+ public:
+ explicit iterator(const PqResultHelper& outer, int curr_row = 0)
+ : outer_(outer), curr_row_(curr_row) {}
+ iterator& operator++() {
+ curr_row_++;
+ return *this;
+ }
+ iterator operator++(int) {
+ iterator retval = *this;
+ ++(*this);
+ return retval;
+ }
+ bool operator==(iterator other) const {
+ return outer_.result_ == other.outer_.result_ && curr_row_ ==
other.curr_row_;
+ }
+ bool operator!=(iterator other) const { return !(*this == other); }
+ PqResultRow operator*() { return PqResultRow(outer_.result_, curr_row_); }
+ using iterator_category = std::forward_iterator_tag;
+ using difference_type = std::ptrdiff_t;
+ using value_type = std::vector<PqResultRow>;
+ using pointer = const std::vector<PqResultRow>*;
+ using reference = const std::vector<PqResultRow>&;
+ };
+
+ iterator begin() { return iterator(*this); }
+ iterator end() { return iterator(*this, NumRows()); }
+
+ private:
+ PGresult* result_ = nullptr;
+ PGconn* conn_;
+ std::string query_;
+ std::vector<std::string> param_values_;
+ struct AdbcError* error_;
+};
+} // namespace adbcpq
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index 9fbbcaf9..22e394c6 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -38,6 +38,7 @@
#include "postgres_copy_reader.h"
#include "postgres_type.h"
#include "postgres_util.h"
+#include "result_helper.h"
namespace adbcpq {
@@ -830,11 +831,17 @@ AdbcStatusCode PostgresStatement::Cancel(struct
AdbcError* error) {
}
AdbcStatusCode PostgresStatement::CreateBulkTable(
- const struct ArrowSchema& source_schema,
+ const std::string& current_schema, const struct ArrowSchema& source_schema,
const std::vector<struct ArrowSchemaView>& source_schema_fields,
std::string* escaped_table, struct AdbcError* error) {
PGconn* conn = connection_->conn();
+ if (!ingest_.db_schema.empty() && ingest_.temporary) {
+ SetError(error, "[libpq] Cannot set both %s and %s",
+ ADBC_INGEST_OPTION_TARGET_DB_SCHEMA,
ADBC_INGEST_OPTION_TEMPORARY);
+ return ADBC_STATUS_INVALID_STATE;
+ }
+
{
if (!ingest_.db_schema.empty()) {
char* escaped =
@@ -847,6 +854,17 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
*escaped_table += escaped;
*escaped_table += " . ";
PQfreemem(escaped);
+ } else if (ingest_.temporary) {
+ // OK to be redundant (CREATE TEMPORARY TABLE pg_temp.foo)
+ *escaped_table += "pg_temp . ";
+ } else {
+ // Explicitly specify the current schema to avoid any temporary tables
+ // shadowing this table
+ char* escaped =
+ PQescapeIdentifier(conn, current_schema.c_str(),
current_schema.size());
+ *escaped_table += escaped;
+ *escaped_table += " . ";
+ PQfreemem(escaped);
}
if (!ingest_.target.empty()) {
@@ -862,7 +880,14 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
}
}
- std::string create = "CREATE TABLE ";
+ std::string create;
+
+ if (ingest_.temporary) {
+ create = "CREATE TEMPORARY TABLE ";
+ } else {
+ create = "CREATE TABLE ";
+ }
+
switch (ingest_.mode) {
case IngestMode::kCreate:
// Nothing to do
@@ -1098,12 +1123,27 @@ AdbcStatusCode
PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
return ADBC_STATUS_INVALID_STATE;
}
+ // Need the current schema to avoid being shadowed by temp tables
+ // This is a little unfortunate; we need another DB roundtrip
+ std::string current_schema;
+ {
+ PqResultHelper result_helper{connection_->conn(), "SELECT CURRENT_SCHEMA",
{}, error};
+ RAISE_ADBC(result_helper.Prepare());
+ RAISE_ADBC(result_helper.Execute());
+ auto it = result_helper.begin();
+ if (it == result_helper.end()) {
+ SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT
CURRENT_SCHEMA'");
+ return ADBC_STATUS_INTERNAL;
+ }
+ current_schema = (*it)[0].data;
+ }
+
BindStream bind_stream(std::move(bind_));
std::memset(&bind_, 0, sizeof(bind_));
std::string escaped_table;
RAISE_ADBC(bind_stream.Begin(
[&]() -> AdbcStatusCode {
- return CreateBulkTable(bind_stream.bind_schema.value,
+ return CreateBulkTable(current_schema, bind_stream.bind_schema.value,
bind_stream.bind_schema_fields, &escaped_table,
error);
},
error));
@@ -1247,7 +1287,11 @@ AdbcStatusCode PostgresStatement::SetOption(const char*
key, const char* value,
prepared_ = false;
} else if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) == 0) {
query_.clear();
- ingest_.db_schema = value;
+ if (value == nullptr) {
+ ingest_.db_schema.clear();
+ } else {
+ ingest_.db_schema = value;
+ }
prepared_ = false;
} else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
@@ -1263,6 +1307,17 @@ AdbcStatusCode PostgresStatement::SetOption(const char*
key, const char* value,
return ADBC_STATUS_INVALID_ARGUMENT;
}
prepared_ = false;
+ } else if (std::strcmp(key, ADBC_INGEST_OPTION_TEMPORARY) == 0) {
+ if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
+ ingest_.temporary = true;
+ } else if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
+ ingest_.temporary = false;
+ } else {
+ SetError(error, "[libpq] Invalid value '%s' for option '%s'", value,
key);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ ingest_.db_schema.clear();
+ prepared_ = false;
} else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) ==
0) {
int64_t int_value = std::atol(value);
if (int_value <= 0) {
diff --git a/c/driver/postgresql/statement.h b/c/driver/postgresql/statement.h
index 013334fe..20bb3b7a 100644
--- a/c/driver/postgresql/statement.h
+++ b/c/driver/postgresql/statement.h
@@ -126,7 +126,7 @@ class PostgresStatement {
void ClearResult();
AdbcStatusCode CreateBulkTable(
- const struct ArrowSchema& source_schema,
+ const std::string& current_schema, const struct ArrowSchema&
source_schema,
const std::vector<struct ArrowSchemaView>& source_schema_fields,
std::string* escaped_table, struct AdbcError* error);
AdbcStatusCode ExecuteUpdateBulk(int64_t* rows_affected, struct AdbcError*
error);
@@ -157,6 +157,7 @@ class PostgresStatement {
std::string db_schema;
std::string target;
IngestMode mode = IngestMode::kCreate;
+ bool temporary = false;
} ingest_;
TupleReader reader_;
diff --git a/c/driver/sqlite/sqlite.c b/c/driver/sqlite/sqlite.c
index 2c46976a..83cebec0 100644
--- a/c/driver/sqlite/sqlite.c
+++ b/c/driver/sqlite/sqlite.c
@@ -1099,13 +1099,34 @@ AdbcStatusCode SqliteStatementInitIngest(struct
SqliteStatement* stmt,
goto cleanup;
}
+ if (stmt->target_catalog != NULL && stmt->temporary != 0) {
+ SetError(error, "[SQLite] Cannot specify both %s and %s",
+ ADBC_INGEST_OPTION_TARGET_CATALOG, ADBC_INGEST_OPTION_TEMPORARY);
+ code = ADBC_STATUS_INVALID_STATE;
+ goto cleanup;
+ }
+
if (stmt->target_catalog != NULL) {
table = sqlite3_mprintf("\"%w\" . \"%w\"", stmt->target_catalog,
stmt->target_table);
+ } else if (stmt->temporary == 0) {
+ // If not temporary, explicitly target the main database
+ table = sqlite3_mprintf("main . \"%w\"", stmt->target_table);
} else {
- table = sqlite3_mprintf("\"%w\"", stmt->target_table);
+ // OK to be redundant (CREATE TEMP TABLE temp.foo)
+ table = sqlite3_mprintf("temp . \"%w\"", stmt->target_table);
+ }
+
+ if (table == NULL) {
+ // Allocation failure
+ code = ADBC_STATUS_INTERNAL;
+ goto cleanup;
}
- sqlite3_str_appendf(create_query, "CREATE TABLE %s (", table);
+ if (stmt->temporary != 0) {
+ sqlite3_str_appendf(create_query, "CREATE TEMPORARY TABLE %s (", table);
+ } else {
+ sqlite3_str_appendf(create_query, "CREATE TABLE %s (", table);
+ }
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] Failed to build CREATE: %s",
sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
@@ -1502,6 +1523,16 @@ AdbcStatusCode SqliteStatementSetOption(struct
AdbcStatement* statement, const c
return ADBC_STATUS_INVALID_ARGUMENT;
}
return ADBC_STATUS_OK;
+ } else if (strcmp(key, ADBC_INGEST_OPTION_TEMPORARY) == 0) {
+ if (strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
+ stmt->temporary = 1;
+ } else if (strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
+ stmt->temporary = 0;
+ } else {
+ SetError(error, "[SQLite] Invalid statement option value %s=%s", key,
value);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ return ADBC_STATUS_OK;
} else if (strcmp(key, kStatementOptionBatchRows) == 0) {
char* end = NULL;
long batch_size = strtol(value, &end, /*base=*/10); // NOLINT(runtime/int)
diff --git a/c/driver/sqlite/sqlite_test.cc b/c/driver/sqlite/sqlite_test.cc
index 5fdb8628..c95a3f1d 100644
--- a/c/driver/sqlite/sqlite_test.cc
+++ b/c/driver/sqlite/sqlite_test.cc
@@ -45,20 +45,24 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
AdbcStatusCode DropTable(struct AdbcConnection* connection, const
std::string& name,
struct AdbcError* error) const override {
- struct AdbcStatement statement;
- std::memset(&statement, 0, sizeof(statement));
- AdbcStatusCode status = AdbcStatementNew(connection, &statement, error);
- if (status != ADBC_STATUS_OK) return status;
+ adbc_validation::Handle<struct AdbcStatement> statement;
+ RAISE_ADBC(AdbcStatementNew(connection, &statement.value, error));
std::string query = "DROP TABLE IF EXISTS \"" + name + "\"";
- status = AdbcStatementSetSqlQuery(&statement, query.c_str(), error);
- if (status != ADBC_STATUS_OK) {
- std::ignore = AdbcStatementRelease(&statement, error);
- return status;
- }
- status = AdbcStatementExecuteQuery(&statement, nullptr, nullptr, error);
- std::ignore = AdbcStatementRelease(&statement, error);
- return status;
+ RAISE_ADBC(AdbcStatementSetSqlQuery(&statement.value, query.c_str(),
error));
+ RAISE_ADBC(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
error));
+ return AdbcStatementRelease(&statement.value, error);
+ }
+
+ AdbcStatusCode DropTempTable(struct AdbcConnection* connection, const
std::string& name,
+ struct AdbcError* error) const override {
+ adbc_validation::Handle<struct AdbcStatement> statement;
+ RAISE_ADBC(AdbcStatementNew(connection, &statement.value, error));
+
+ std::string query = "DROP TABLE IF EXISTS temp . \"" + name + "\"";
+ RAISE_ADBC(AdbcStatementSetSqlQuery(&statement.value, query.c_str(),
error));
+ RAISE_ADBC(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
error));
+ return AdbcStatementRelease(&statement.value, error);
}
std::string BindParameter(int index) const override { return "?"; }
@@ -98,6 +102,7 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
std::strcmp(mode, ADBC_INGEST_OPTION_MODE_CREATE) == 0;
}
bool supports_bulk_ingest_catalog() const override { return true; }
+ bool supports_bulk_ingest_temporary() const override { return true; }
bool supports_concurrent_statements() const override { return true; }
bool supports_get_option() const override { return false; }
std::optional<adbc_validation::SqlInfoValue> supports_get_sql_info(
diff --git a/c/driver/sqlite/types.h b/c/driver/sqlite/types.h
index 805aed46..c9e57e33 100644
--- a/c/driver/sqlite/types.h
+++ b/c/driver/sqlite/types.h
@@ -53,6 +53,7 @@ struct SqliteStatement {
char* target_catalog;
char* target_table;
char append;
+ char temporary;
// -- Query options ---------------------------------------
int batch_size;
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index beebb177..d25f236b 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -2087,6 +2087,430 @@ void StatementTest::TestSqlIngestTargetCatalogSchema() {
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error),
IsOkStatus(&error));
}
+void StatementTest::TestSqlIngestTemporary() {
+ if (!quirks()->supports_bulk_ingest_temporary() ||
+ !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
+ GTEST_SKIP();
+ }
+
+ Handle<struct AdbcStatement> statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+
+ std::string name = "bulk_ingest";
+
+ ASSERT_THAT(quirks()->DropTable(&connection, name, &error),
IsOkStatus(&error));
+ ASSERT_THAT(quirks()->DropTempTable(&connection, name, &error),
IsOkStatus(&error));
+
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}),
IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_ENABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}),
IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_DISABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error),
IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlIngestTemporaryAppend() {
+ // Append to temp table shouldn't affect actual table and vice versa
+ if (!quirks()->supports_bulk_ingest_temporary() ||
+ !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) ||
+ !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_APPEND)) {
+ GTEST_SKIP();
+ }
+
+ Handle<struct AdbcStatement> statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+
+ std::string name = "bulk_ingest";
+
+ ASSERT_THAT(quirks()->DropTable(&connection, name, &error),
IsOkStatus(&error));
+ ASSERT_THAT(quirks()->DropTempTable(&connection, name, &error),
IsOkStatus(&error));
+
+ // Create both tables with different schemas
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}),
IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_ENABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"strs", NANOARROW_TYPE_STRING}}),
+ IsOkErrno());
+ ASSERT_THAT((MakeBatch<std::string>(&schema.value, &array.value, &na_error,
+ {"foo", "bar", std::nullopt})),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_DISABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ // Append to the temporary table
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}),
IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{0, 1, 2})),
+ IsOkErrno());
+
+ Handle<struct AdbcStatement> statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_ENABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_APPEND, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ // Append to the normal table
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"strs", NANOARROW_TYPE_STRING}}),
+ IsOkErrno());
+ ASSERT_THAT(
+ (MakeBatch<std::string>(&schema.value, &array.value, &na_error, {"",
"a", "b"})),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_DISABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_APPEND, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error),
IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlIngestTemporaryReplace() {
+ // Replace temp table shouldn't affect actual table and vice versa
+ if (!quirks()->supports_bulk_ingest_temporary() ||
+ !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) ||
+ !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_APPEND) ||
+ !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_REPLACE)) {
+ GTEST_SKIP();
+ }
+
+ Handle<struct AdbcStatement> statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+
+ std::string name = "bulk_ingest";
+
+ ASSERT_THAT(quirks()->DropTable(&connection, name, &error),
IsOkStatus(&error));
+ ASSERT_THAT(quirks()->DropTempTable(&connection, name, &error),
IsOkStatus(&error));
+
+ // Create both tables with different schemas
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}),
IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_ENABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"strs", NANOARROW_TYPE_STRING}}),
+ IsOkErrno());
+ ASSERT_THAT((MakeBatch<std::string>(&schema.value, &array.value, &na_error,
+ {"foo", "bar", std::nullopt})),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_DISABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ // Replace both tables with different schemas
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints2", NANOARROW_TYPE_INT64},
+ {"strs2", NANOARROW_TYPE_STRING}}),
+ IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value,
&na_error,
+ {0, 1, std::nullopt},
+ {"foo", "bar",
std::nullopt})),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_ENABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_REPLACE,
&error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints3", NANOARROW_TYPE_INT64}}),
+ IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{1, 2, 3})),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_DISABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_REPLACE,
&error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ // Now append to the replaced tables to check that the schemas are as
expected
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints2", NANOARROW_TYPE_INT64},
+ {"strs2", NANOARROW_TYPE_STRING}}),
+ IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value,
&na_error,
+ {0, 1, std::nullopt},
+ {"foo", "bar",
std::nullopt})),
+ IsOkErrno());
+
+ Handle<struct AdbcStatement> statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_ENABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_APPEND, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints3", NANOARROW_TYPE_INT64}}),
+ IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{4, 5, 6})),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_DISABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_APPEND, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsOkStatus(&error));
+ }
+
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error),
IsOkStatus(&error));
+}
+
+void StatementTest::TestSqlIngestTemporaryExclusive() {
+ // Can't set target schema/catalog with temp table
+ if (!quirks()->supports_bulk_ingest_temporary() ||
+ !quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
+ GTEST_SKIP();
+ }
+
+ std::string name = "bulk_ingest";
+ ASSERT_THAT(quirks()->DropTempTable(&connection, name, &error),
IsOkStatus(&error));
+
+ if (quirks()->supports_bulk_ingest_catalog()) {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}),
IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
+
+ std::string catalog = quirks()->catalog();
+
+ Handle<struct AdbcStatement> statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_ENABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(
+ AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_CATALOG,
+ catalog.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsStatus(ADBC_STATUS_INVALID_STATE, &error));
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error),
IsOkStatus(&error));
+ }
+
+ if (quirks()->supports_bulk_ingest_db_schema()) {
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}),
IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
+
+ std::string db_schema = quirks()->db_schema();
+
+ Handle<struct AdbcStatement> statement;
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TEMPORARY,
+ ADBC_OPTION_VALUE_ENABLED, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_TABLE,
+ name.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(
+ AdbcStatementSetOption(&statement.value,
ADBC_INGEST_OPTION_TARGET_DB_SCHEMA,
+ db_schema.c_str(), &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value,
&schema.value, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr,
&error),
+ IsStatus(ADBC_STATUS_INVALID_STATE, &error));
+ ASSERT_THAT(AdbcStatementRelease(&statement.value, &error),
IsOkStatus(&error));
+ }
+}
+
void StatementTest::TestSqlPartitionedInts() {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error),
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index 56f44ba0..0d936de7 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -50,6 +50,13 @@ class DriverQuirks {
return ADBC_STATUS_OK;
}
+ /// \brief Drop the given temporary table. Used by tests to reset state.
+ virtual AdbcStatusCode DropTempTable(struct AdbcConnection* connection,
+ const std::string& name,
+ struct AdbcError* error) const {
+ return ADBC_STATUS_OK;
+ }
+
/// \brief Drop the given view. Used by tests to reset state.
virtual AdbcStatusCode DropView(struct AdbcConnection* connection,
const std::string& name,
@@ -97,6 +104,9 @@ class DriverQuirks {
/// \brief Whether bulk ingest to a specific schema is supported
virtual bool supports_bulk_ingest_db_schema() const { return false; }
+ /// \brief Whether bulk ingest to a temporary table is supported
+ virtual bool supports_bulk_ingest_temporary() const { return false; }
+
/// \brief Whether we can cancel queries.
virtual bool supports_cancel() const { return false; }
@@ -300,6 +310,10 @@ class StatementTest {
void TestSqlIngestTargetCatalog();
void TestSqlIngestTargetSchema();
void TestSqlIngestTargetCatalogSchema();
+ void TestSqlIngestTemporary();
+ void TestSqlIngestTemporaryAppend();
+ void TestSqlIngestTemporaryReplace();
+ void TestSqlIngestTemporaryExclusive();
void TestSqlPartitionedInts();
@@ -385,6 +399,10 @@ class StatementTest {
TEST_F(FIXTURE, SqlIngestTargetCatalog) { TestSqlIngestTargetCatalog(); }
\
TEST_F(FIXTURE, SqlIngestTargetSchema) { TestSqlIngestTargetSchema(); }
\
TEST_F(FIXTURE, SqlIngestTargetCatalogSchema) {
TestSqlIngestTargetCatalogSchema(); } \
+ TEST_F(FIXTURE, SqlIngestTemporary) { TestSqlIngestTemporary(); }
\
+ TEST_F(FIXTURE, SqlIngestTemporaryAppend) { TestSqlIngestTemporaryAppend();
} \
+ TEST_F(FIXTURE, SqlIngestTemporaryReplace) {
TestSqlIngestTemporaryReplace(); } \
+ TEST_F(FIXTURE, SqlIngestTemporaryExclusive) {
TestSqlIngestTemporaryExclusive(); } \
TEST_F(FIXTURE, SqlPartitionedInts) { TestSqlPartitionedInts(); }
\
TEST_F(FIXTURE, SqlPrepareGetParameterSchema) {
TestSqlPrepareGetParameterSchema(); } \
TEST_F(FIXTURE, SqlPrepareSelectNoParams) { TestSqlPrepareSelectNoParams();
} \
diff --git a/python/adbc_driver_manager/adbc_driver_manager/__init__.py
b/python/adbc_driver_manager/adbc_driver_manager/__init__.py
index 336fdb21..61cd8bb1 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/__init__.py
+++ b/python/adbc_driver_manager/adbc_driver_manager/__init__.py
@@ -128,5 +128,8 @@ class StatementOptions(enum.Enum):
#: For bulk ingestion, the schema to create/locate the table in.
#: **This API is EXPERIMENTAL.**
INGEST_TARGET_DB_SCHEMA = "adbc.ingest.target_db_schema"
+ #: For bulk ingestion, use a temporary table.
+ #: **This API is EXPERIMENTAL.**
+ INGEST_TEMPORARY = "adbc.ingest.temporary"
#: Get progress of a query.
PROGRESS = "adbc.statement.exec.progress"
diff --git a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
index f358a0fa..5b563805 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
+++ b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
@@ -795,9 +795,9 @@ class Cursor(_Closeable):
*,
catalog_name: Optional[str] = None,
db_schema_name: Optional[str] = None,
+ temporary: bool = False,
) -> int:
- """
- Ingest Arrow data into a database table.
+ """Ingest Arrow data into a database table.
Depending on the driver, this can avoid per-row overhead that
would result from a typical prepare-bind-insert loop.
@@ -821,6 +821,11 @@ class Cursor(_Closeable):
db_schema_name
If given, the schema to create/locate the table in.
**This API is EXPERIMENTAL.**
+ temporary
+ Whether to ingest to a temporary table or not. Most drivers will
+ not support setting this along with catalog_name and/or
+ db_schema_name.
+ **This API is EXPERIMENTAL.**
Returns
-------
@@ -831,6 +836,7 @@ class Cursor(_Closeable):
Notes
-----
This is an extension and not part of the DBAPI standard.
+
"""
if mode == "append":
c_mode = _lib.INGEST_OPTION_MODE_APPEND
@@ -857,6 +863,22 @@ class Cursor(_Closeable):
] = db_schema_name
self._stmt.set_options(**options)
+ if temporary:
+ self._stmt.set_options(
+ **{
+
adbc_driver_manager.StatementOptions.INGEST_TEMPORARY.value: "true",
+ }
+ )
+ else:
+ # Need to explicitly clear it, but not all drivers support this
+ options = {
+ adbc_driver_manager.StatementOptions.INGEST_TEMPORARY.value:
"false",
+ }
+ try:
+ self._stmt.set_options(**options)
+ except NotSupportedError:
+ pass
+
if isinstance(data, pyarrow.RecordBatch):
array = _lib.ArrowArrayHandle()
schema = _lib.ArrowSchemaHandle()
diff --git a/python/adbc_driver_postgresql/tests/test_dbapi.py
b/python/adbc_driver_postgresql/tests/test_dbapi.py
index 93779db1..9b4b7451 100644
--- a/python/adbc_driver_postgresql/tests/test_dbapi.py
+++ b/python/adbc_driver_postgresql/tests/test_dbapi.py
@@ -174,7 +174,7 @@ def test_stmt_ingest(postgres: dbapi.Connection) -> None:
cur.execute("DROP TABLE IF EXISTS test_ingest")
with pytest.raises(
- postgres.ProgrammingError, match='"test_ingest" does not exist'
+ postgres.ProgrammingError, match='"public.test_ingest" does not
exist'
):
cur.adbc_ingest("test_ingest", table, mode="append")
postgres.rollback()
@@ -263,3 +263,75 @@ def test_ingest(postgres: dbapi.Connection) -> None:
with pytest.raises(dbapi.NotSupportedError):
cur.adbc_ingest("foo", table, catalog_name="main")
+
+
+def test_ingest_temporary(postgres: dbapi.Connection) -> None:
+ table = pyarrow.Table.from_pydict(
+ {
+ "numbers": [1, 2],
+ "letters": ["a", "b"],
+ }
+ )
+ temp = pyarrow.Table.from_pydict(
+ {
+ "ints": [3, 4],
+ "strs": ["c", "d"],
+ }
+ )
+
+ table2 = pyarrow.Table.from_pydict(
+ {
+ "numbers": [1, 2, 1, 2],
+ "letters": ["a", "b", "a", "b"],
+ }
+ )
+ temp2 = pyarrow.Table.from_pydict(
+ {
+ "ints": [3, 4, 3, 4],
+ "strs": ["c", "d", "c", "d"],
+ }
+ )
+
+ with postgres.cursor() as cur:
+ cur.execute("DROP TABLE IF EXISTS public.temporary")
+ cur.execute("DROP TABLE IF EXISTS pg_temp.temporary")
+
+ cur.adbc_ingest("temporary", table, mode="create")
+ cur.adbc_ingest("temporary", temp, mode="create", temporary=True)
+
+ cur.execute("SELECT * FROM public.temporary")
+ assert cur.fetch_arrow_table() == table
+ cur.execute("SELECT * FROM pg_temp.temporary")
+ assert cur.fetch_arrow_table() == temp
+ cur.execute("SELECT * FROM temporary")
+ assert cur.fetch_arrow_table() == temp
+
+ cur.adbc_ingest("temporary", table, mode="append")
+ cur.adbc_ingest("temporary", temp, mode="append", temporary=True)
+
+ cur.execute("SELECT * FROM public.temporary")
+ assert cur.fetch_arrow_table() == table2
+ cur.execute("SELECT * FROM pg_temp.temporary")
+ assert cur.fetch_arrow_table() == temp2
+ cur.execute("SELECT * FROM temporary")
+ assert cur.fetch_arrow_table() == temp2
+
+ cur.adbc_ingest("temporary", table, mode="replace")
+ cur.adbc_ingest("temporary", temp, mode="replace", temporary=True)
+
+ cur.execute("SELECT * FROM public.temporary")
+ assert cur.fetch_arrow_table() == table
+ cur.execute("SELECT * FROM pg_temp.temporary")
+ assert cur.fetch_arrow_table() == temp
+ cur.execute("SELECT * FROM temporary")
+ assert cur.fetch_arrow_table() == temp
+
+ cur.adbc_ingest("temporary", table, mode="create_append")
+ cur.adbc_ingest("temporary", temp, mode="create_append",
temporary=True)
+
+ cur.execute("SELECT * FROM public.temporary")
+ assert cur.fetch_arrow_table() == table2
+ cur.execute("SELECT * FROM pg_temp.temporary")
+ assert cur.fetch_arrow_table() == temp2
+ cur.execute("SELECT * FROM temporary")
+ assert cur.fetch_arrow_table() == temp2
diff --git a/r/adbcpostgresql/bootstrap.R b/r/adbcpostgresql/bootstrap.R
index c3e06795..9bcc4149 100644
--- a/r/adbcpostgresql/bootstrap.R
+++ b/r/adbcpostgresql/bootstrap.R
@@ -31,6 +31,8 @@ files_to_vendor <- c(
"../../c/driver/postgresql/database.h",
"../../c/driver/postgresql/database.cc",
"../../c/driver/postgresql/postgresql.cc",
+ "../../c/driver/postgresql/result_helper.h",
+ "../../c/driver/postgresql/result_helper.cc",
"../../c/driver/common/options.h",
"../../c/driver/common/utils.h",
"../../c/driver/common/utils.c",
diff --git a/r/adbcpostgresql/src/Makevars.in b/r/adbcpostgresql/src/Makevars.in
index 8244768b..456c56f9 100644
--- a/r/adbcpostgresql/src/Makevars.in
+++ b/r/adbcpostgresql/src/Makevars.in
@@ -22,6 +22,7 @@ OBJECTS = init.o \
error.o \
connection.o \
database.o \
+ result_helper.o \
statement.o \
common/utils.o \
postgresql.o \
diff --git a/r/adbcpostgresql/src/Makevars.ucrt
b/r/adbcpostgresql/src/Makevars.ucrt
index a72e9845..bf9816e6 100644
--- a/r/adbcpostgresql/src/Makevars.ucrt
+++ b/r/adbcpostgresql/src/Makevars.ucrt
@@ -22,6 +22,7 @@ OBJECTS = init.o \
error.o \
connection.o \
database.o \
+ result_helper.o \
statement.o \
postgresql.o \
common/utils.o \
diff --git a/r/adbcpostgresql/src/Makevars.win
b/r/adbcpostgresql/src/Makevars.win
index 331ef274..0b4ab3c5 100644
--- a/r/adbcpostgresql/src/Makevars.win
+++ b/r/adbcpostgresql/src/Makevars.win
@@ -25,6 +25,7 @@ OBJECTS = init.o \
error.o \
connection.o \
database.o \
+ result_helper.o \
statement.o \
postgresql.o \
common/utils.o \