This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 86996c7 [C] Add basic partitioned data test (#123)
86996c7 is described below
commit 86996c7e234f76b820bf2d209b1f599b87a1c3cc
Author: David Li <[email protected]>
AuthorDate: Tue Sep 13 14:38:47 2022 -0400
[C] Add basic partitioned data test (#123)
---
c/drivers/postgres/postgres.cc | 55 +++++++++++++++++++++++++++++++++-----
c/drivers/sqlite/sqlite.cc | 38 ++++++++++++++++++++++++++
c/validation/adbc_validation.cc | 59 +++++++++++++++++++++++++++++++++++++++++
c/validation/adbc_validation.h | 6 +++++
4 files changed, 151 insertions(+), 7 deletions(-)
diff --git a/c/drivers/postgres/postgres.cc b/c/drivers/postgres/postgres.cc
index 8760160..5804931 100644
--- a/c/drivers/postgres/postgres.cc
+++ b/c/drivers/postgres/postgres.cc
@@ -167,13 +167,13 @@ AdbcStatusCode PostgresConnectionNew(struct
AdbcConnection* connection,
return ADBC_STATUS_OK;
}
-AdbcStatusCode PostgresConnectionSetOption(struct AdbcConnection* connection,
- const char* key, const char* value,
- struct AdbcError* error) {
+AdbcStatusCode PostgresConnectionReadPartition(struct AdbcConnection*
connection,
+ const uint8_t*
serialized_partition,
+ size_t serialized_length,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
-
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
- return (*ptr)->SetOption(key, value, error);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
}
AdbcStatusCode PostgresConnectionRelease(struct AdbcConnection* connection,
@@ -194,6 +194,16 @@ AdbcStatusCode PostgresConnectionRollback(struct
AdbcConnection* connection,
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
return (*ptr)->Rollback(error);
}
+
+AdbcStatusCode PostgresConnectionSetOption(struct AdbcConnection* connection,
+ const char* key, const char* value,
+ struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
+ return (*ptr)->SetOption(key, value, error);
+}
+
} // namespace
AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error) {
@@ -244,6 +254,15 @@ AdbcStatusCode AdbcConnectionNew(struct AdbcConnection*
connection,
return PostgresConnectionNew(connection, error);
}
+AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
+ const uint8_t* serialized_partition,
+ size_t serialized_length,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ return PostgresConnectionReadPartition(connection, serialized_partition,
+ serialized_length, out, error);
+}
+
AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
return PostgresConnectionRelease(connection, error);
@@ -282,6 +301,17 @@ AdbcStatusCode PostgresStatementBindStream(struct
AdbcStatement* statement,
return (*ptr)->Bind(stream, error);
}
+AdbcStatusCode PostgresStatementExecutePartitions(struct AdbcStatement*
statement,
+ struct ArrowSchema* schema,
+ struct AdbcPartitions*
partitions,
+ int64_t* rows_affected,
+ struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto* ptr =
+
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
AdbcStatusCode PostgresStatementExecuteQuery(struct AdbcStatement* statement,
struct ArrowArrayStream* output,
int64_t* rows_affected,
@@ -370,6 +400,15 @@ AdbcStatusCode AdbcStatementBindStream(struct
AdbcStatement* statement,
return PostgresStatementBindStream(statement, stream, error);
}
+AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
+ ArrowSchema* schema,
+ struct AdbcPartitions*
partitions,
+ int64_t* rows_affected,
+ struct AdbcError* error) {
+ return PostgresStatementExecutePartitions(statement, schema, partitions,
rows_affected,
+ error);
+}
+
AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
struct ArrowArrayStream* output,
int64_t* rows_affected,
@@ -435,17 +474,19 @@ AdbcStatusCode AdbcDriverInit(int version, void*
raw_driver, struct AdbcError* e
driver->ConnectionCommit = PostgresConnectionCommit;
driver->ConnectionGetInfo = PostgresConnectionGetInfo;
- // driver->ConnectionGetObjects = PostgresConnectionGetObjects;
+ driver->ConnectionGetObjects = PostgresConnectionGetObjects;
driver->ConnectionGetTableSchema = PostgresConnectionGetTableSchema;
driver->ConnectionGetTableTypes = PostgresConnectionGetTableTypes;
driver->ConnectionInit = PostgresConnectionInit;
driver->ConnectionNew = PostgresConnectionNew;
+ driver->ConnectionReadPartition = PostgresConnectionReadPartition;
driver->ConnectionRelease = PostgresConnectionRelease;
driver->ConnectionRollback = PostgresConnectionRollback;
driver->ConnectionSetOption = PostgresConnectionSetOption;
driver->StatementBind = PostgresStatementBind;
driver->StatementBindStream = PostgresStatementBindStream;
+ driver->StatementExecutePartitions = PostgresStatementExecutePartitions;
driver->StatementExecuteQuery = PostgresStatementExecuteQuery;
driver->StatementGetParameterSchema = PostgresStatementGetParameterSchema;
driver->StatementNew = PostgresStatementNew;
diff --git a/c/drivers/sqlite/sqlite.cc b/c/drivers/sqlite/sqlite.cc
index 268d090..06765dd 100644
--- a/c/drivers/sqlite/sqlite.cc
+++ b/c/drivers/sqlite/sqlite.cc
@@ -1468,6 +1468,15 @@ AdbcStatusCode SqliteConnectionNew(struct
AdbcConnection* connection,
return ADBC_STATUS_OK;
}
+AdbcStatusCode SqliteConnectionReadPartition(struct AdbcConnection* connection,
+ const uint8_t*
serialized_partition,
+ size_t serialized_length,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
@@ -1529,6 +1538,15 @@ AdbcStatusCode SqliteStatementBindStream(struct
AdbcStatement* statement,
return (*ptr)->Bind(*ptr, stream, error);
}
+AdbcStatusCode SqliteStatementExecutePartitions(struct AdbcStatement*
statement,
+ struct ArrowSchema* schema,
+ struct AdbcPartitions*
partitions,
+ int64_t* rows_affected,
+ struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
AdbcStatusCode SqliteStatementExecuteQuery(struct AdbcStatement* statement,
struct ArrowArrayStream* out,
int64_t* rows_affected,
@@ -1674,6 +1692,15 @@ AdbcStatusCode AdbcConnectionNew(struct AdbcConnection*
connection,
return SqliteConnectionNew(connection, error);
}
+AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
+ const uint8_t* serialized_partition,
+ size_t serialized_length,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ return SqliteConnectionReadPartition(connection, serialized_partition,
+ serialized_length, out, error);
+}
+
AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
return SqliteConnectionRelease(connection, error);
@@ -1701,6 +1728,15 @@ AdbcStatusCode AdbcStatementBindStream(struct
AdbcStatement* statement,
return SqliteStatementBindStream(statement, stream, error);
}
+AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
+ ArrowSchema* schema,
+ struct AdbcPartitions*
partitions,
+ int64_t* rows_affected,
+ struct AdbcError* error) {
+ return SqliteStatementExecutePartitions(statement, schema, partitions,
rows_affected,
+ error);
+}
+
AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
struct ArrowArrayStream* out,
int64_t* rows_affected,
@@ -1771,12 +1807,14 @@ AdbcStatusCode AdbcDriverInit(int version, void*
raw_driver, struct AdbcError* e
driver->ConnectionGetTableTypes = SqliteConnectionGetTableTypes;
driver->ConnectionInit = SqliteConnectionInit;
driver->ConnectionNew = SqliteConnectionNew;
+ driver->ConnectionReadPartition = SqliteConnectionReadPartition;
driver->ConnectionRelease = SqliteConnectionRelease;
driver->ConnectionRollback = SqliteConnectionRollback;
driver->ConnectionSetOption = SqliteConnectionSetOption;
driver->StatementBind = SqliteStatementBind;
driver->StatementBindStream = SqliteStatementBindStream;
+ driver->StatementExecutePartitions = SqliteStatementExecutePartitions;
driver->StatementExecuteQuery = SqliteStatementExecuteQuery;
driver->StatementGetParameterSchema = SqliteStatementGetParameterSchema;
driver->StatementNew = SqliteStatementNew;
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index 756e40e..492ef48 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -1445,6 +1445,65 @@ void StatementTest::TestSqlIngestMultipleConnections() {
}
}
+void StatementTest::TestSqlPartitionedInts() {
+ ADBCV_ASSERT_OK(&error, AdbcStatementNew(&connection, &statement, &error));
+ ADBCV_ASSERT_OK(&error, AdbcStatementSetSqlQuery(&statement, "SELECT 42",
&error));
+
+ struct ArrowSchema schema;
+ Handle<struct AdbcPartitions> partitions;
+ int64_t rows_affected = 0;
+
+ if (!quirks()->supports_partitioned_data()) {
+ ADBCV_ASSERT_FAILS_WITH(
+ NOT_IMPLEMENTED, &error,
+ AdbcStatementExecutePartitions(&statement, &schema, &partitions.value,
+ &rows_affected, &error));
+ return;
+ }
+
+ ADBCV_ASSERT_OK(&error,
+ AdbcStatementExecutePartitions(&statement, &schema,
&partitions.value,
+ &rows_affected, &error));
+ // Assume only 1 partition
+ ASSERT_EQ(1, partitions->num_partitions);
+ ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1),
::testing::Eq(-1)));
+ ASSERT_NE(nullptr, schema.release);
+ ASSERT_EQ(1, schema.n_children);
+
+ Handle<struct AdbcConnection> connection2;
+ StreamReader reader;
+ ADBCV_ASSERT_OK(&error, AdbcConnectionNew(&connection2.value, &error));
+ ADBCV_ASSERT_OK(&error, AdbcConnectionInit(&connection2.value, &database,
&error));
+ ADBCV_ASSERT_OK(
+ &error, AdbcConnectionReadPartition(&connection2.value,
partitions->partitions[0],
+ partitions->partition_lengths[0],
+ &reader.stream.value, &error));
+
+ ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+ ASSERT_EQ(1, reader.schema->n_children);
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NE(nullptr, reader.array->release);
+ ASSERT_EQ(1, reader.array->length);
+ ASSERT_EQ(1, reader.array->n_children);
+
+ switch (reader.fields[0].data_type) {
+ case NANOARROW_TYPE_INT32:
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int32_t>(reader.array_view->children[0], {42}));
+ break;
+ case NANOARROW_TYPE_INT64:
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0], {42}));
+ break;
+ default:
+ FAIL() << "Unexpected data type: " << reader.fields[0].data_type;
+ }
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_EQ(nullptr, reader.array->release);
+}
+
void StatementTest::TestSqlPrepareSelectNoParams() {
ADBCV_ASSERT_OK(&error, AdbcStatementNew(&connection, &statement, &error));
ADBCV_ASSERT_OK(&error, AdbcStatementSetSqlQuery(&statement, "SELECT 1",
&error));
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index 26e5f7f..15bc5be 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -50,6 +50,9 @@ class DriverQuirks {
/// \brief Whether two statements can be used at the same time on a
/// single connection
virtual bool supports_concurrent_statements() const { return false; }
+
+ /// \brief Whether AdbcStatementExecutePartitions should work
+ virtual bool supports_partitioned_data() const { return false; }
};
class DatabaseTest {
@@ -143,6 +146,8 @@ class StatementTest {
void TestSqlIngestErrors();
void TestSqlIngestMultipleConnections();
+ void TestSqlPartitionedInts();
+
void TestSqlPrepareSelectNoParams();
void TestSqlPrepareSelectParams();
void TestSqlPrepareUpdate();
@@ -178,6 +183,7 @@ class StatementTest {
TEST_F(FIXTURE, SqlIngestAppend) { TestSqlIngestAppend(); }
\
TEST_F(FIXTURE, SqlIngestErrors) { TestSqlIngestErrors(); }
\
TEST_F(FIXTURE, SqlIngestMultipleConnections) {
TestSqlIngestMultipleConnections(); } \
+ TEST_F(FIXTURE, SqlPartitionedInts) { TestSqlPartitionedInts(); }
\
TEST_F(FIXTURE, SqlPrepareSelectNoParams) { TestSqlPrepareSelectNoParams();
} \
TEST_F(FIXTURE, SqlPrepareSelectParams) { TestSqlPrepareSelectParams(); }
\
TEST_F(FIXTURE, SqlPrepareUpdate) { TestSqlPrepareUpdate(); }
\