This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new a4a0cc137 feat(c/driver/postgresql): Implement ingestion of list types
for PostgreSQL (#2153)
a4a0cc137 is described below
commit a4a0cc1370d0f105b47c57b69376e748baa53df6
Author: Dewey Dunnington <[email protected]>
AuthorDate: Thu Sep 12 10:50:10 2024 -0500
feat(c/driver/postgresql): Implement ingestion of list types for PostgreSQL
(#2153)
This PR adds list ingest support in the PostgreSQL driver. The raw COPY
and type support had already been added; however, there were still some
hard-coded switch-on-type that implemented some necessary features
(e.g., the CREATE TABLE syntax for each type). I moved these to the
type/copy section and removed the previous implementation (which should
also help for a future PR implementing the ability to bind nested types
as parameters).
I also added the infrastructure to test this in all drivers via the
validation suite, which required modifying some of the built-in concepts
to accommodate a nested type.
Closes #2066.
``` r
library(adbcdrivermanager)
con <- adbc_database_init(
adbcpostgresql::adbcpostgresql(),
uri =
"postgresql://localhost:5432/postgres?user=postgres&password=password"
) |>
adbc_connection_init()
df <- tibble::tibble(
A = vctrs::list_of(1:3, 4:5, 6:10),
B = c(10.0, 20.0, 30.0)
)
con |>
execute_adbc("DROP TABLE IF EXISTS table_with_list")
df |>
write_adbc(con, "table_with_list")
con |>
read_adbc("select * from table_with_list") |>
tibble::as_tibble() |>
dplyr::pull(1)
#> <list_of<integer>[3]>
#> [[1]]
#> [1] 1 2 3
#>
#> [[2]]
#> [1] 4 5
#>
#> [[3]]
#> [1] 6 7 8 9 10
```
<sup>Created on 2024-09-11 with [reprex
v2.1.1](https://reprex.tidyverse.org)</sup>
---
.../postgresql/copy/postgres_copy_writer_test.cc | 162 +++++----------------
c/driver/postgresql/postgres_type.h | 33 +++++
c/driver/postgresql/postgres_type_test.cc | 5 +-
c/driver/postgresql/postgresql_test.cc | 2 +-
c/driver/postgresql/statement.cc | 99 ++-----------
c/driver/postgresql/statement.h | 10 +-
c/driver/sqlite/sqlite_test.cc | 6 +
c/driver_manager/adbc_driver_manager_test.cc | 6 +
c/validation/adbc_validation.h | 25 ++++
c/validation/adbc_validation_connection.cc | 8 +-
c/validation/adbc_validation_statement.cc | 40 ++++-
c/validation/adbc_validation_util.cc | 50 +++++--
c/validation/adbc_validation_util.h | 73 +++++++++-
13 files changed, 274 insertions(+), 245 deletions(-)
diff --git a/c/driver/postgresql/copy/postgres_copy_writer_test.cc
b/c/driver/postgresql/copy/postgres_copy_writer_test.cc
index 618f27cf1..5010848cf 100644
--- a/c/driver/postgresql/copy/postgres_copy_writer_test.cc
+++ b/c/driver/postgresql/copy/postgres_copy_writer_test.cc
@@ -836,32 +836,16 @@ TEST_P(PostgresCopyListTest,
PostgresCopyWriteListSmallInt) {
adbc_validation::Handle<struct ArrowArray> array;
struct ArrowError na_error;
- ASSERT_EQ(ArrowSchemaInitFromType(&schema.value, NANOARROW_TYPE_STRUCT),
NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaAllocateChildren(&schema.value, 1), NANOARROW_OK);
-
- ASSERT_EQ(ArrowSchemaInitFromType(schema->children[0], GetParam()),
NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaSetName(schema->children[0], "col"), NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaSetType(schema->children[0]->children[0],
NANOARROW_TYPE_INT16),
- NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayInitFromSchema(&array.value, &schema.value, nullptr),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayStartAppending(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -123),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -1),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 0),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 1),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 123),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendNull(array->children[0], 1), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
+ ASSERT_EQ(adbc_validation::MakeSchema(
+ &schema.value, {adbc_validation::SchemaField::Nested(
+ "col", GetParam(), {{"item",
NANOARROW_TYPE_INT16}})}),
+ ADBC_STATUS_OK);
- ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array.value, &na_error),
NANOARROW_OK);
+ ASSERT_EQ(adbc_validation::MakeBatch<std::vector<int16_t>>(
+ &schema.value, &array.value, &na_error,
+ {std::vector<int16_t>{-123, -1}, std::vector<int16_t>{0, 1,
123},
+ std::nullopt}),
+ ADBC_STATUS_OK);
PostgresCopyStreamWriteTester tester;
ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_),
NANOARROW_OK);
@@ -882,32 +866,16 @@ TEST_P(PostgresCopyListTest,
PostgresCopyWriteListInteger) {
adbc_validation::Handle<struct ArrowArray> array;
struct ArrowError na_error;
- ASSERT_EQ(ArrowSchemaInitFromType(&schema.value, NANOARROW_TYPE_STRUCT),
NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaAllocateChildren(&schema.value, 1), NANOARROW_OK);
-
- ASSERT_EQ(ArrowSchemaInitFromType(schema->children[0], GetParam()),
NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaSetName(schema->children[0], "col"), NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaSetType(schema->children[0]->children[0],
NANOARROW_TYPE_INT32),
- NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayInitFromSchema(&array.value, &schema.value, nullptr),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayStartAppending(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -123),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -1),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 0),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 1),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 123),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendNull(array->children[0], 1), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
+ ASSERT_EQ(adbc_validation::MakeSchema(
+ &schema.value, {adbc_validation::SchemaField::Nested(
+ "col", GetParam(), {{"item",
NANOARROW_TYPE_INT32}})}),
+ ADBC_STATUS_OK);
- ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array.value, &na_error),
NANOARROW_OK);
+ ASSERT_EQ(adbc_validation::MakeBatch<std::vector<int32_t>>(
+ &schema.value, &array.value, &na_error,
+ {std::vector<int32_t>{-123, -1}, std::vector<int32_t>{0, 1,
123},
+ std::nullopt}),
+ ADBC_STATUS_OK);
PostgresCopyStreamWriteTester tester;
ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_),
NANOARROW_OK);
@@ -942,32 +910,16 @@ TEST_P(PostgresCopyListTest, PostgresCopyWriteListBigInt)
{
adbc_validation::Handle<struct ArrowArray> array;
struct ArrowError na_error;
- ASSERT_EQ(ArrowSchemaInitFromType(&schema.value, NANOARROW_TYPE_STRUCT),
NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaAllocateChildren(&schema.value, 1), NANOARROW_OK);
-
- ASSERT_EQ(ArrowSchemaInitFromType(schema->children[0], GetParam()),
NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaSetName(schema->children[0], "col"), NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaSetType(schema->children[0]->children[0],
NANOARROW_TYPE_INT64),
- NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayInitFromSchema(&array.value, &schema.value, nullptr),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayStartAppending(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -123),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -1),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 0),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 1),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 123),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendNull(array->children[0], 1), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
+ ASSERT_EQ(adbc_validation::MakeSchema(
+ &schema.value, {adbc_validation::SchemaField::Nested(
+ "col", GetParam(), {{"item",
NANOARROW_TYPE_INT64}})}),
+ ADBC_STATUS_OK);
- ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array.value, &na_error),
NANOARROW_OK);
+ ASSERT_EQ(adbc_validation::MakeBatch<std::vector<int64_t>>(
+ &schema.value, &array.value, &na_error,
+ {std::vector<int64_t>{-123, -1}, std::vector<int64_t>{0, 1,
123},
+ std::nullopt}),
+ ADBC_STATUS_OK);
PostgresCopyStreamWriteTester tester;
ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_),
NANOARROW_OK);
@@ -1002,38 +954,17 @@ TEST_P(PostgresCopyListTest,
PostgresCopyWriteListVarchar) {
adbc_validation::Handle<struct ArrowArray> array;
struct ArrowError na_error;
- ASSERT_EQ(ArrowSchemaInitFromType(&schema.value, NANOARROW_TYPE_STRUCT),
NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaAllocateChildren(&schema.value, 1), NANOARROW_OK);
-
- ASSERT_EQ(ArrowSchemaInitFromType(schema->children[0], GetParam()),
NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaSetName(schema->children[0], "col"), NANOARROW_OK);
- ASSERT_EQ(ArrowSchemaSetType(schema->children[0]->children[0],
NANOARROW_TYPE_STRING),
- NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayInitFromSchema(&array.value, &schema.value, nullptr),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayStartAppending(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendString(array->children[0]->children[0],
ArrowCharView("foo")),
- NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendString(array->children[0]->children[0],
ArrowCharView("bar")),
- NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendString(array->children[0]->children[0],
ArrowCharView("baz")),
- NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendString(array->children[0]->children[0],
ArrowCharView("qux")),
- NANOARROW_OK);
ASSERT_EQ(
- ArrowArrayAppendString(array->children[0]->children[0],
ArrowCharView("quux")),
- NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendNull(array->children[0], 1), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
+ adbc_validation::MakeSchema(
+ &schema.value, {adbc_validation::SchemaField::Nested(
+ "col", GetParam(), {{"item",
NANOARROW_TYPE_STRING}})}),
+ ADBC_STATUS_OK);
- ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array.value, &na_error),
NANOARROW_OK);
+ ASSERT_EQ(adbc_validation::MakeBatch<std::vector<std::string>>(
+ &schema.value, &array.value, &na_error,
+ {std::vector<std::string>{"foo", "bar"},
+ std::vector<std::string>{"baz", "qux", "quux"},
std::nullopt}),
+ ADBC_STATUS_OK);
PostgresCopyStreamWriteTester tester;
ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_),
NANOARROW_OK);
@@ -1079,23 +1010,10 @@ TEST_F(PostgresCopyTest,
PostgresCopyWriteFixedSizeListInteger) {
ASSERT_EQ(ArrowSchemaSetType(schema->children[0]->children[0],
NANOARROW_TYPE_INT32),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayInitFromSchema(&array.value, &schema.value, nullptr),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayStartAppending(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 1),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], 2),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -1),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayAppendInt(array->children[0]->children[0], -2),
NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(array->children[0]), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayAppendNull(array->children[0], 1), NANOARROW_OK);
- ASSERT_EQ(ArrowArrayFinishElement(&array.value), NANOARROW_OK);
-
- ASSERT_EQ(ArrowArrayFinishBuildingDefault(&array.value, &na_error),
NANOARROW_OK);
+ ASSERT_EQ(adbc_validation::MakeBatch<std::vector<int32_t>>(
+ &schema.value, &array.value, &na_error,
+ {std::vector<int32_t>{1, 2}, std::vector<int32_t>{-1, -2},
std::nullopt}),
+ ADBC_STATUS_OK);
PostgresCopyStreamWriteTester tester;
ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_),
NANOARROW_OK);
diff --git a/c/driver/postgresql/postgres_type.h
b/c/driver/postgresql/postgres_type.h
index df59b113e..28891a7a9 100644
--- a/c/driver/postgresql/postgres_type.h
+++ b/c/driver/postgresql/postgres_type.h
@@ -184,6 +184,19 @@ class PostgresType {
int64_t n_children() const { return static_cast<int64_t>(children_.size()); }
const PostgresType& child(int64_t i) const { return children_[i]; }
+ // The name used to communicate this type in a CREATE TABLE statement.
+ // These are not necessarily the most idiomatic names to use but PostgreSQL
+ // will accept typname() according to the "aliases" column in
+ // https://www.postgresql.org/docs/current/datatype.html
+ const std::string sql_type_name() const {
+ switch (type_id_) {
+ case PostgresTypeId::kArray:
+ return children_[0].sql_type_name() + " ARRAY";
+ default:
+ return typname_;
+ }
+ }
+
// Sets appropriate fields of an ArrowSchema that has been initialized using
// ArrowSchemaInit. This is a recursive operation (i.e., nested types will
// initialize and set the appropriate number of children). Returns
NANOARROW_OK
@@ -551,10 +564,30 @@ inline ArrowErrorCode PostgresType::FromSchema(const
PostgresTypeResolver& resol
case NANOARROW_TYPE_DOUBLE:
return resolver.Find(resolver.GetOID(PostgresTypeId::kFloat8), out,
error);
case NANOARROW_TYPE_STRING:
+ case NANOARROW_TYPE_LARGE_STRING:
return resolver.Find(resolver.GetOID(PostgresTypeId::kText), out, error);
case NANOARROW_TYPE_BINARY:
+ case NANOARROW_TYPE_LARGE_BINARY:
case NANOARROW_TYPE_FIXED_SIZE_BINARY:
return resolver.Find(resolver.GetOID(PostgresTypeId::kBytea), out,
error);
+ case NANOARROW_TYPE_DATE32:
+ case NANOARROW_TYPE_DATE64:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kDate), out, error);
+ case NANOARROW_TYPE_TIME32:
+ case NANOARROW_TYPE_TIME64:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kTime), out, error);
+ case NANOARROW_TYPE_DURATION:
+ case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kInterval), out,
error);
+ case NANOARROW_TYPE_TIMESTAMP:
+ if (strcmp("", schema_view.timezone) == 0) {
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kTimestamptz),
out, error);
+ } else {
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kTimestamp), out,
error);
+ }
+ case NANOARROW_TYPE_DECIMAL128:
+ case NANOARROW_TYPE_DECIMAL256:
+ return resolver.Find(resolver.GetOID(PostgresTypeId::kNumeric), out,
error);
case NANOARROW_TYPE_LIST:
case NANOARROW_TYPE_LARGE_LIST:
case NANOARROW_TYPE_FIXED_SIZE_LIST: {
diff --git a/c/driver/postgresql/postgres_type_test.cc
b/c/driver/postgresql/postgres_type_test.cc
index 79bd8d590..2e713204f 100644
--- a/c/driver/postgresql/postgres_type_test.cc
+++ b/c/driver/postgresql/postgres_type_test.cc
@@ -320,11 +320,10 @@ TEST(PostgresTypeTest, PostgresTypeFromSchema) {
schema.reset();
ArrowError error;
- ASSERT_EQ(ArrowSchemaInitFromType(schema.get(),
NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO),
+ ASSERT_EQ(ArrowSchemaInitFromType(schema.get(),
NANOARROW_TYPE_INTERVAL_MONTHS),
NANOARROW_OK);
EXPECT_EQ(PostgresType::FromSchema(resolver, schema.get(), &type, &error),
ENOTSUP);
- EXPECT_STREQ(error.message,
- "Can't map Arrow type 'interval_month_day_nano' to Postgres
type");
+ EXPECT_STREQ(error.message, "Can't map Arrow type 'interval_months' to
Postgres type");
schema.reset();
}
diff --git a/c/driver/postgresql/postgresql_test.cc
b/c/driver/postgresql/postgresql_test.cc
index 142e50638..d5d94c4c8 100644
--- a/c/driver/postgresql/postgresql_test.cc
+++ b/c/driver/postgresql/postgresql_test.cc
@@ -1779,7 +1779,7 @@ TEST_P(PostgresTypeTest, SelectValue) {
// check type
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(adbc_validation::CompareSchema(
- &reader.schema.value, {{std::nullopt, GetParam().arrow_type, true}}));
+ &reader.schema.value, {{"", GetParam().arrow_type, true}}));
if (GetParam().arrow_type == NANOARROW_TYPE_TIMESTAMP) {
if (GetParam().sql_type.find("WITH TIME ZONE") == std::string::npos) {
ASSERT_STREQ(reader.schema->children[0]->format, "tsu:");
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index c8dbe4d78..f091b4fde 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -318,11 +318,11 @@ AdbcStatusCode PostgresStatement::Cancel(struct
AdbcError* error) {
return connection_->Cancel(error);
}
-AdbcStatusCode PostgresStatement::CreateBulkTable(
- const std::string& current_schema, const struct ArrowSchema& source_schema,
- const std::vector<struct ArrowSchemaView>& source_schema_fields,
- std::string* escaped_table, std::string* escaped_field_list,
- struct AdbcError* error) {
+AdbcStatusCode PostgresStatement::CreateBulkTable(const std::string&
current_schema,
+ const struct ArrowSchema&
source_schema,
+ std::string* escaped_table,
+ std::string*
escaped_field_list,
+ struct AdbcError* error) {
PGconn* conn = connection_->conn();
if (!ingest_.db_schema.empty() && ingest_.temporary) {
@@ -405,7 +405,7 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
create += *escaped_table;
create += " (";
- for (size_t i = 0; i < source_schema_fields.size(); i++) {
+ for (int64_t i = 0; i < source_schema.n_children; i++) {
if (i > 0) {
create += ", ";
*escaped_field_list += ", ";
@@ -422,82 +422,13 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
*escaped_field_list += escaped;
PQfreemem(escaped);
- switch (source_schema_fields[i].type) {
- case ArrowType::NANOARROW_TYPE_BOOL:
- create += " BOOLEAN";
- break;
- case ArrowType::NANOARROW_TYPE_INT8:
- case ArrowType::NANOARROW_TYPE_INT16:
- create += " SMALLINT";
- break;
- case ArrowType::NANOARROW_TYPE_INT32:
- create += " INTEGER";
- break;
- case ArrowType::NANOARROW_TYPE_INT64:
- create += " BIGINT";
- break;
- case ArrowType::NANOARROW_TYPE_FLOAT:
- create += " REAL";
- break;
- case ArrowType::NANOARROW_TYPE_DOUBLE:
- create += " DOUBLE PRECISION";
- break;
- case ArrowType::NANOARROW_TYPE_STRING:
- case ArrowType::NANOARROW_TYPE_LARGE_STRING:
- create += " TEXT";
- break;
- case ArrowType::NANOARROW_TYPE_BINARY:
- create += " BYTEA";
- break;
- case ArrowType::NANOARROW_TYPE_DATE32:
- create += " DATE";
- break;
- case ArrowType::NANOARROW_TYPE_TIMESTAMP:
- if (strcmp("", source_schema_fields[i].timezone)) {
- create += " TIMESTAMPTZ";
- } else {
- create += " TIMESTAMP";
- }
- break;
- case ArrowType::NANOARROW_TYPE_DURATION:
- case ArrowType::NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
- create += " INTERVAL";
- break;
- case ArrowType::NANOARROW_TYPE_DECIMAL128:
- case ArrowType::NANOARROW_TYPE_DECIMAL256:
- create += " DECIMAL";
- break;
- case ArrowType::NANOARROW_TYPE_DICTIONARY: {
- struct ArrowSchemaView value_view;
- CHECK_NA(INTERNAL,
- ArrowSchemaViewInit(&value_view,
source_schema.children[i]->dictionary,
- nullptr),
- error);
- switch (value_view.type) {
- case NANOARROW_TYPE_BINARY:
- case NANOARROW_TYPE_LARGE_BINARY:
- create += " BYTEA";
- break;
- case NANOARROW_TYPE_STRING:
- case NANOARROW_TYPE_LARGE_STRING:
- create += " TEXT";
- break;
- default:
- SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
- static_cast<uint64_t>(i + 1), " ('",
source_schema.children[i]->name,
- "') has unsupported dictionary value type for ingestion ",
- ArrowTypeString(value_view.type));
- return ADBC_STATUS_NOT_IMPLEMENTED;
- }
- break;
- }
- default:
- SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
- static_cast<uint64_t>(i + 1), " ('",
source_schema.children[i]->name,
- "') has unsupported type for ingestion ",
- ArrowTypeString(source_schema_fields[i].type));
- return ADBC_STATUS_NOT_IMPLEMENTED;
- }
+ PostgresType pg_type;
+ struct ArrowError na_error;
+ CHECK_NA_DETAIL(INTERNAL,
+ PostgresType::FromSchema(*type_resolver_,
source_schema.children[i],
+ &pg_type, &na_error),
+ &na_error, error);
+ create += " " + pg_type.sql_type_name();
}
if (ingest_.mode == IngestMode::kAppend) {
@@ -682,11 +613,9 @@ AdbcStatusCode PostgresStatement::ExecuteIngest(struct
ArrowArrayStream* stream,
RAISE_ADBC(bind_stream.Begin(
[&]() -> AdbcStatusCode {
return CreateBulkTable(current_schema, bind_stream.bind_schema.value,
- bind_stream.bind_schema_fields, &escaped_table,
- &escaped_field_list, error);
+ &escaped_table, &escaped_field_list, error);
},
error));
- RAISE_ADBC(bind_stream.SetParamTypes(*type_resolver_, error));
std::string query = "COPY ";
query += escaped_table;
diff --git a/c/driver/postgresql/statement.h b/c/driver/postgresql/statement.h
index 246119930..9e79f41ed 100644
--- a/c/driver/postgresql/statement.h
+++ b/c/driver/postgresql/statement.h
@@ -133,11 +133,11 @@ class PostgresStatement {
// Helper methods
void ClearResult();
- AdbcStatusCode CreateBulkTable(
- const std::string& current_schema, const struct ArrowSchema&
source_schema,
- const std::vector<struct ArrowSchemaView>& source_schema_fields,
- std::string* escaped_table, std::string* escaped_field_list,
- struct AdbcError* error);
+ AdbcStatusCode CreateBulkTable(const std::string& current_schema,
+ const struct ArrowSchema& source_schema,
+ std::string* escaped_table,
+ std::string* escaped_field_list,
+ struct AdbcError* error);
AdbcStatusCode ExecuteIngest(struct ArrowArrayStream* stream, int64_t*
rows_affected,
struct AdbcError* error);
AdbcStatusCode ExecuteBind(struct ArrowArrayStream* stream, int64_t*
rows_affected,
diff --git a/c/driver/sqlite/sqlite_test.cc b/c/driver/sqlite/sqlite_test.cc
index 320eec0e9..d644b6827 100644
--- a/c/driver/sqlite/sqlite_test.cc
+++ b/c/driver/sqlite/sqlite_test.cc
@@ -328,6 +328,12 @@ class SqliteStatementTest : public ::testing::Test,
void TestSqlIngestInterval() {
GTEST_SKIP() << "Cannot ingest Interval (not implemented)";
}
+ void TestSqlIngestListOfInt32() {
+ GTEST_SKIP() << "Cannot ingest list<int32> (not implemented)";
+ }
+ void TestSqlIngestListOfString() {
+ GTEST_SKIP() << "Cannot ingest list<string> (not implemented)";
+ }
protected:
void ValidateIngestedTemporalData(struct ArrowArrayView* values, ArrowType
type,
diff --git a/c/driver_manager/adbc_driver_manager_test.cc
b/c/driver_manager/adbc_driver_manager_test.cc
index 0d8e36250..f58b2ec32 100644
--- a/c/driver_manager/adbc_driver_manager_test.cc
+++ b/c/driver_manager/adbc_driver_manager_test.cc
@@ -281,6 +281,12 @@ class SqliteStatementTest : public ::testing::Test,
void TestSqlIngestInterval() {
GTEST_SKIP() << "Cannot ingest Interval (not implemented)";
}
+ void TestSqlIngestListOfInt32() {
+ GTEST_SKIP() << "Cannot ingest list<int32> (not implemented)";
+ }
+ void TestSqlIngestListOfString() {
+ GTEST_SKIP() << "Cannot ingest list<string> (not implemented)";
+ }
protected:
SqliteQuirks quirks_;
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index ab665ac10..3ceed0dee 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -27,6 +27,8 @@
#include <gtest/gtest.h>
#include <nanoarrow/nanoarrow.h>
+#include "adbc_validation_util.h"
+
namespace adbc_validation {
#define ADBCV_STRINGIFY(s) #s
@@ -160,6 +162,18 @@ class DriverQuirks {
return ingest_type;
}
+ /// \brief For a given Arrow type of (possibly nested) ingested data, what
Arrow type
+ /// will the database return when that column is selected?
+ virtual SchemaField IngestSelectRoundTripType(SchemaField ingest_field)
const {
+ SchemaField out(ingest_field.name,
IngestSelectRoundTripType(ingest_field.type),
+ ingest_field.nullable);
+ for (const auto& child : ingest_field.children) {
+ out.children.push_back(IngestSelectRoundTripType(child));
+ }
+
+ return out;
+ }
+
/// \brief Whether bulk ingest is supported
virtual bool supports_bulk_ingest(const char* mode) const { return true; }
@@ -377,6 +391,10 @@ class StatementTest {
// Dictionary-encoded
void TestSqlIngestStringDictionary();
+ // Nested
+ void TestSqlIngestListOfInt32();
+ void TestSqlIngestListOfString();
+
void TestSqlIngestStreamZeroArrays();
// ---- End Type-specific tests ----------------
@@ -440,6 +458,11 @@ class StatementTest {
struct AdbcConnection connection;
struct AdbcStatement statement;
+ template <typename CType>
+ void TestSqlIngestType(SchemaField type,
+ const std::vector<std::optional<CType>>& values,
+ bool dictionary_encode);
+
template <typename CType>
void TestSqlIngestType(ArrowType type, const
std::vector<std::optional<CType>>& values,
bool dictionary_encode);
@@ -480,6 +503,8 @@ class StatementTest {
TEST_F(FIXTURE, SqlIngestTimestampTz) { TestSqlIngestTimestampTz(); }
\
TEST_F(FIXTURE, SqlIngestInterval) { TestSqlIngestInterval(); }
\
TEST_F(FIXTURE, SqlIngestStringDictionary) {
TestSqlIngestStringDictionary(); } \
+ TEST_F(FIXTURE, SqlIngestListOfInt32) { TestSqlIngestListOfInt32(); }
\
+ TEST_F(FIXTURE, SqlIngestListOfString) { TestSqlIngestListOfString(); }
\
TEST_F(FIXTURE, TestSqlIngestStreamZeroArrays) {
TestSqlIngestStreamZeroArrays(); } \
TEST_F(FIXTURE, SqlIngestTableEscaping) { TestSqlIngestTableEscaping(); }
\
TEST_F(FIXTURE, SqlIngestColumnEscaping) { TestSqlIngestColumnEscaping(); }
\
diff --git a/c/validation/adbc_validation_connection.cc
b/c/validation/adbc_validation_connection.cc
index 80e1fbb7b..a885fa2c8 100644
--- a/c/validation/adbc_validation_connection.cc
+++ b/c/validation/adbc_validation_connection.cc
@@ -425,10 +425,10 @@ void CheckGetObjectsSchema(struct ArrowSchema* schema) {
{"constraint_column_names", NANOARROW_TYPE_LIST,
NOT_NULL},
{"constraint_column_usage", NANOARROW_TYPE_LIST,
NULLABLE},
}));
- ASSERT_NO_FATAL_FAILURE(CompareSchema(
- constraint_schema->children[2], {
- {std::nullopt,
NANOARROW_TYPE_STRING, NULLABLE},
- }));
+ ASSERT_NO_FATAL_FAILURE(CompareSchema(constraint_schema->children[2],
+ {
+ {"", NANOARROW_TYPE_STRING,
NULLABLE},
+ }));
struct ArrowSchema* usage_schema =
constraint_schema->children[3]->children[0];
ASSERT_NO_FATAL_FAILURE(
diff --git a/c/validation/adbc_validation_statement.cc
b/c/validation/adbc_validation_statement.cc
index 431620594..81b469698 100644
--- a/c/validation/adbc_validation_statement.cc
+++ b/c/validation/adbc_validation_statement.cc
@@ -79,9 +79,12 @@ void StatementTest::TestRelease() {
}
template <typename CType>
-void StatementTest::TestSqlIngestType(ArrowType type,
+void StatementTest::TestSqlIngestType(SchemaField field,
const std::vector<std::optional<CType>>&
values,
bool dictionary_encode) {
+ // Override the field name
+ field.name = "col";
+
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
@@ -92,7 +95,7 @@ void StatementTest::TestSqlIngestType(ArrowType type,
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
- ASSERT_THAT(MakeSchema(&schema.value, {{"col", type}}), IsOkErrno());
+ ASSERT_THAT(MakeSchema(&schema.value, {field}), IsOkErrno());
ASSERT_THAT(MakeBatch<CType>(&schema.value, &array.value, &na_error, values),
IsOkErrno());
@@ -155,16 +158,15 @@ void StatementTest::TestSqlIngestType(ArrowType type,
::testing::AnyOf(::testing::Eq(values.size()),
::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
- ArrowType round_trip_type = quirks()->IngestSelectRoundTripType(type);
- ASSERT_NO_FATAL_FAILURE(
- CompareSchema(&reader.schema.value, {{"col", round_trip_type,
NULLABLE}}));
+ SchemaField round_trip_field = quirks()->IngestSelectRoundTripType(field);
+ ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
{round_trip_field}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(values.size(), reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
- if (round_trip_type == type) {
+ if (round_trip_field.type == field.type) {
// XXX: for now we can't compare values; we would need casting
ASSERT_NO_FATAL_FAILURE(
CompareArray<CType>(reader.array_view->children[0], values));
@@ -176,6 +178,14 @@ void StatementTest::TestSqlIngestType(ArrowType type,
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
+template <typename CType>
+void StatementTest::TestSqlIngestType(ArrowType type,
+ const std::vector<std::optional<CType>>&
values,
+ bool dictionary_encode) {
+ SchemaField field("col", type);
+ TestSqlIngestType<CType>(field, values, dictionary_encode);
+}
+
template <typename CType>
void StatementTest::TestSqlIngestNumericType(ArrowType type) {
std::vector<std::optional<CType>> values = {
@@ -491,6 +501,24 @@ void StatementTest::TestSqlIngestStringDictionary() {
/*dictionary_encode*/
true));
}
+void StatementTest::TestSqlIngestListOfInt32() {
+ SchemaField field =
+ SchemaField::Nested("col", NANOARROW_TYPE_LIST, {{"item",
NANOARROW_TYPE_INT32}});
+ ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::vector<int32_t>>(
+ field, {std::nullopt, std::vector<int32_t>{1, 2, 3},
std::vector<int32_t>{4, 5}},
+ /*dictionary_encode*/ false));
+}
+
+void StatementTest::TestSqlIngestListOfString() {
+ SchemaField field =
+ SchemaField::Nested("col", NANOARROW_TYPE_LIST, {{"item",
NANOARROW_TYPE_STRING}});
+ ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::vector<std::string>>(
+ field,
+ {std::nullopt, std::vector<std::string>{"abc", "defg"},
+ std::vector<std::string>{"hijk"}},
+ /*dictionary_encode*/ false));
+}
+
void StatementTest::TestSqlIngestStreamZeroArrays() {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
diff --git a/c/validation/adbc_validation_util.cc
b/c/validation/adbc_validation_util.cc
index 54c18cce7..91876dae3 100644
--- a/c/validation/adbc_validation_util.cc
+++ b/c/validation/adbc_validation_util.cc
@@ -165,16 +165,43 @@ void IsAdbcStatusCode::DescribeNegationTo(std::ostream*
os) const {
} \
} while (false);
+static int MakeSchemaColumnImpl(struct ArrowSchema* column, const SchemaField&
field) {
+ CHECK_ERRNO(ArrowSchemaSetType(column, field.type));
+ CHECK_ERRNO(ArrowSchemaSetName(column, field.name.c_str()));
+
+ if (!field.nullable) {
+ column->flags &= ~ARROW_FLAG_NULLABLE;
+ }
+
+ if (static_cast<size_t>(column->n_children) != field.children.size()) {
+ return EINVAL;
+ }
+
+ switch (field.type) {
+ // SetType for a list will allocate and initialize children
+ case NANOARROW_TYPE_LIST:
+ case NANOARROW_TYPE_LARGE_LIST:
+ case NANOARROW_TYPE_MAP: {
+ size_t i = 0;
+ for (const SchemaField& child : field.children) {
+ CHECK_ERRNO(MakeSchemaColumnImpl(column->children[i], child));
+ ++i;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+ return 0;
+}
+
int MakeSchema(struct ArrowSchema* schema, const std::vector<SchemaField>&
fields) {
ArrowSchemaInit(schema);
CHECK_ERRNO(ArrowSchemaSetTypeStruct(schema, fields.size()));
size_t i = 0;
for (const SchemaField& field : fields) {
- CHECK_ERRNO(ArrowSchemaSetType(schema->children[i], field.type));
- CHECK_ERRNO(ArrowSchemaSetName(schema->children[i], field.name.c_str()));
- if (!field.nullable) {
- schema->children[i]->flags &= ~ARROW_FLAG_NULLABLE;
- }
+ CHECK_ERRNO(MakeSchemaColumnImpl(schema->children[i], field));
i++;
}
return 0;
@@ -244,9 +271,7 @@ void MakeStream(struct ArrowArrayStream* stream, struct
ArrowSchema* schema,
stream->private_data = new ConstantArrayStream(schema, std::move(batches));
}
-void CompareSchema(
- struct ArrowSchema* schema,
- const std::vector<std::tuple<std::optional<std::string>, ArrowType,
bool>>& fields) {
+void CompareSchema(struct ArrowSchema* schema, const std::vector<SchemaField>&
fields) {
struct ArrowError na_error;
struct ArrowSchemaView view;
@@ -261,12 +286,11 @@ void CompareSchema(
struct ArrowSchemaView field_view;
ASSERT_THAT(ArrowSchemaViewInit(&field_view, schema->children[i],
&na_error),
IsOkErrno(&na_error));
- ASSERT_EQ(std::get<1>(fields[i]), field_view.type);
- ASSERT_EQ(std::get<2>(fields[i]),
- (schema->children[i]->flags & ARROW_FLAG_NULLABLE) != 0)
+ ASSERT_EQ(fields[i].type, field_view.type);
+ ASSERT_EQ(fields[i].nullable, (schema->children[i]->flags &
ARROW_FLAG_NULLABLE) != 0)
<< "Nullability mismatch";
- if (std::get<0>(fields[i]).has_value()) {
- ASSERT_STRCASEEQ(std::get<0>(fields[i])->c_str(),
schema->children[i]->name);
+ if (fields[i].name != "") {
+ ASSERT_STRCASEEQ(fields[i].name.c_str(), schema->children[i]->name);
}
}
}
diff --git a/c/validation/adbc_validation_util.h
b/c/validation/adbc_validation_util.h
index 21eca52de..6f2546269 100644
--- a/c/validation/adbc_validation_util.h
+++ b/c/validation/adbc_validation_util.h
@@ -257,12 +257,20 @@ struct SchemaField {
std::string name;
ArrowType type = NANOARROW_TYPE_UNINITIALIZED;
bool nullable = true;
+ std::vector<SchemaField> children;
SchemaField(std::string name, ArrowType type, bool nullable)
: name(std::move(name)), type(type), nullable(nullable) {}
SchemaField(std::string name, ArrowType type)
: SchemaField(std::move(name), type, /*nullable=*/true) {}
+
+ static SchemaField Nested(std::string name, ArrowType type,
+ std::vector<SchemaField> children) {
+ SchemaField out(name, type);
+ out.children = std::move(children);
+ return out;
+ }
};
/// \brief Make a schema from a vector of (name, type, nullable) tuples.
@@ -303,6 +311,29 @@ int MakeArray(struct ArrowArray* parent, struct
ArrowArray* array,
CHECK_OK(ArrowArrayAppendInterval(array, *v));
} else if constexpr (std::is_same<T, ArrowDecimal*>::value) {
CHECK_OK(ArrowArrayAppendDecimal(array, *v));
+ } else if constexpr (
+ // Possibly a more effective way to do this using template magic
+ // Not included but possible are the std::optional<> variants of this
+ std::is_same<T, std::vector<bool>>::value ||
+ std::is_same<T, std::vector<int8_t>>::value ||
+ std::is_same<T, std::vector<int16_t>>::value ||
+ std::is_same<T, std::vector<int32_t>>::value ||
+ std::is_same<T, std::vector<int64_t>>::value ||
+ std::is_same<T, std::vector<uint8_t>>::value ||
+ std::is_same<T, std::vector<uint16_t>>::value ||
+ std::is_same<T, std::vector<uint32_t>>::value ||
+ std::is_same<T, std::vector<uint64_t>>::value ||
+ std::is_same<T, std::vector<double>>::value ||
+ std::is_same<T, std::vector<float>>::value ||
+ std::is_same<T, std::vector<std::string>>::value ||
+ std::is_same<T, std::vector<std::vector<std::byte>>>::value) {
+ using child_t = typename T::value_type;
+ std::vector<std::optional<child_t>> value_nullable;
+ for (const auto& child_value : *v) {
+ value_nullable.push_back(child_value);
+ }
+ CHECK_OK(MakeArray(array, array->children[0], value_nullable));
+ CHECK_OK(ArrowArrayFinishElement(array));
} else {
static_assert(!sizeof(T), "Not yet implemented");
return ENOTSUP;
@@ -359,9 +390,13 @@ void MakeStream(struct ArrowArrayStream* stream, struct
ArrowSchema* schema,
/// \brief Compare an array for equality against a vector of values.
template <typename T>
void CompareArray(struct ArrowArrayView* array,
- const std::vector<std::optional<T>>& values) {
- ASSERT_EQ(static_cast<int64_t>(values.size()), array->array->length);
- int64_t i = 0;
+ const std::vector<std::optional<T>>& values, int64_t offset
= 0,
+ int64_t length = -1) {
+ if (length == -1) {
+ length = array->length;
+ }
+ ASSERT_EQ(static_cast<int64_t>(values.size()), length);
+ int64_t i = offset;
for (const auto& v : values) {
SCOPED_TRACE("Array index " + std::to_string(i));
if (v.has_value()) {
@@ -421,6 +456,34 @@ void CompareArray(struct ArrowArrayView* array,
ASSERT_EQ(interval.months, (*v)->months);
ASSERT_EQ(interval.days, (*v)->days);
ASSERT_EQ(interval.ns, (*v)->ns);
+
+ } else if constexpr (
+ // Possibly a more effective way to do this using template magic
+ // Not included but possible are the std::optional<> variants of this
+ std::is_same<T, std::vector<bool>>::value ||
+ std::is_same<T, std::vector<int8_t>>::value ||
+ std::is_same<T, std::vector<int16_t>>::value ||
+ std::is_same<T, std::vector<int32_t>>::value ||
+ std::is_same<T, std::vector<int64_t>>::value ||
+ std::is_same<T, std::vector<uint8_t>>::value ||
+ std::is_same<T, std::vector<uint16_t>>::value ||
+ std::is_same<T, std::vector<uint32_t>>::value ||
+ std::is_same<T, std::vector<uint64_t>>::value ||
+ std::is_same<T, std::vector<double>>::value ||
+ std::is_same<T, std::vector<float>>::value ||
+ std::is_same<T, std::vector<std::string>>::value ||
+ std::is_same<T, std::vector<std::vector<std::byte>>>::value) {
+ using child_t = typename T::value_type;
+ std::vector<std::optional<child_t>> value_nullable;
+ for (const auto& child_value : *v) {
+ value_nullable.push_back(child_value);
+ }
+
+ SCOPED_TRACE("List item");
+ int64_t child_offset = ArrowArrayViewListChildOffset(array, i);
+ int64_t child_length = ArrowArrayViewListChildOffset(array, i + 1) -
child_offset;
+ CompareArray<child_t>(array->children[0], value_nullable, child_offset,
+ child_length);
} else {
static_assert(!sizeof(T), "Not yet implemented");
}
@@ -433,9 +496,7 @@ void CompareArray(struct ArrowArrayView* array,
/// \brief Compare a schema for equality against a vector of (name,
/// type, nullable) tuples.
-void CompareSchema(
- struct ArrowSchema* schema,
- const std::vector<std::tuple<std::optional<std::string>, ArrowType,
bool>>& fields);
+void CompareSchema(struct ArrowSchema* schema, const std::vector<SchemaField>&
fields);
/// \brief Helper method to get the vendor version of a driver
std::string GetDriverVendorVersion(struct AdbcConnection* connection);