This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 86996c7  [C] Add basic partitioned data test (#123)
86996c7 is described below

commit 86996c7e234f76b820bf2d209b1f599b87a1c3cc
Author: David Li <[email protected]>
AuthorDate: Tue Sep 13 14:38:47 2022 -0400

    [C] Add basic partitioned data test (#123)
---
 c/drivers/postgres/postgres.cc  | 55 +++++++++++++++++++++++++++++++++-----
 c/drivers/sqlite/sqlite.cc      | 38 ++++++++++++++++++++++++++
 c/validation/adbc_validation.cc | 59 +++++++++++++++++++++++++++++++++++++++++
 c/validation/adbc_validation.h  |  6 +++++
 4 files changed, 151 insertions(+), 7 deletions(-)

diff --git a/c/drivers/postgres/postgres.cc b/c/drivers/postgres/postgres.cc
index 8760160..5804931 100644
--- a/c/drivers/postgres/postgres.cc
+++ b/c/drivers/postgres/postgres.cc
@@ -167,13 +167,13 @@ AdbcStatusCode PostgresConnectionNew(struct 
AdbcConnection* connection,
   return ADBC_STATUS_OK;
 }
 
-AdbcStatusCode PostgresConnectionSetOption(struct AdbcConnection* connection,
-                                           const char* key, const char* value,
-                                           struct AdbcError* error) {
+AdbcStatusCode PostgresConnectionReadPartition(struct AdbcConnection* 
connection,
+                                               const uint8_t* 
serialized_partition,
+                                               size_t serialized_length,
+                                               struct ArrowArrayStream* out,
+                                               struct AdbcError* error) {
   if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
-  return (*ptr)->SetOption(key, value, error);
+  return ADBC_STATUS_NOT_IMPLEMENTED;
 }
 
 AdbcStatusCode PostgresConnectionRelease(struct AdbcConnection* connection,
@@ -194,6 +194,16 @@ AdbcStatusCode PostgresConnectionRollback(struct 
AdbcConnection* connection,
       
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
   return (*ptr)->Rollback(error);
 }
+
+AdbcStatusCode PostgresConnectionSetOption(struct AdbcConnection* connection,
+                                           const char* key, const char* value,
+                                           struct AdbcError* error) {
+  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+  auto ptr =
+      
reinterpret_cast<std::shared_ptr<PostgresConnection>*>(connection->private_data);
+  return (*ptr)->SetOption(key, value, error);
+}
+
 }  // namespace
 AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
                                     struct AdbcError* error) {
@@ -244,6 +254,15 @@ AdbcStatusCode AdbcConnectionNew(struct AdbcConnection* 
connection,
   return PostgresConnectionNew(connection, error);
 }
 
+AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
+                                           const uint8_t* serialized_partition,
+                                           size_t serialized_length,
+                                           struct ArrowArrayStream* out,
+                                           struct AdbcError* error) {
+  return PostgresConnectionReadPartition(connection, serialized_partition,
+                                         serialized_length, out, error);
+}
+
 AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
                                      struct AdbcError* error) {
   return PostgresConnectionRelease(connection, error);
@@ -282,6 +301,17 @@ AdbcStatusCode PostgresStatementBindStream(struct 
AdbcStatement* statement,
   return (*ptr)->Bind(stream, error);
 }
 
+AdbcStatusCode PostgresStatementExecutePartitions(struct AdbcStatement* 
statement,
+                                                  struct ArrowSchema* schema,
+                                                  struct AdbcPartitions* 
partitions,
+                                                  int64_t* rows_affected,
+                                                  struct AdbcError* error) {
+  if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+  auto* ptr =
+      
reinterpret_cast<std::shared_ptr<PostgresStatement>*>(statement->private_data);
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
 AdbcStatusCode PostgresStatementExecuteQuery(struct AdbcStatement* statement,
                                              struct ArrowArrayStream* output,
                                              int64_t* rows_affected,
@@ -370,6 +400,15 @@ AdbcStatusCode AdbcStatementBindStream(struct 
AdbcStatement* statement,
   return PostgresStatementBindStream(statement, stream, error);
 }
 
+AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
+                                              ArrowSchema* schema,
+                                              struct AdbcPartitions* 
partitions,
+                                              int64_t* rows_affected,
+                                              struct AdbcError* error) {
+  return PostgresStatementExecutePartitions(statement, schema, partitions, 
rows_affected,
+                                            error);
+}
+
 AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
                                          struct ArrowArrayStream* output,
                                          int64_t* rows_affected,
@@ -435,17 +474,19 @@ AdbcStatusCode AdbcDriverInit(int version, void* 
raw_driver, struct AdbcError* e
 
   driver->ConnectionCommit = PostgresConnectionCommit;
   driver->ConnectionGetInfo = PostgresConnectionGetInfo;
-  // driver->ConnectionGetObjects = PostgresConnectionGetObjects;
+  driver->ConnectionGetObjects = PostgresConnectionGetObjects;
   driver->ConnectionGetTableSchema = PostgresConnectionGetTableSchema;
   driver->ConnectionGetTableTypes = PostgresConnectionGetTableTypes;
   driver->ConnectionInit = PostgresConnectionInit;
   driver->ConnectionNew = PostgresConnectionNew;
+  driver->ConnectionReadPartition = PostgresConnectionReadPartition;
   driver->ConnectionRelease = PostgresConnectionRelease;
   driver->ConnectionRollback = PostgresConnectionRollback;
   driver->ConnectionSetOption = PostgresConnectionSetOption;
 
   driver->StatementBind = PostgresStatementBind;
   driver->StatementBindStream = PostgresStatementBindStream;
+  driver->StatementExecutePartitions = PostgresStatementExecutePartitions;
   driver->StatementExecuteQuery = PostgresStatementExecuteQuery;
   driver->StatementGetParameterSchema = PostgresStatementGetParameterSchema;
   driver->StatementNew = PostgresStatementNew;
diff --git a/c/drivers/sqlite/sqlite.cc b/c/drivers/sqlite/sqlite.cc
index 268d090..06765dd 100644
--- a/c/drivers/sqlite/sqlite.cc
+++ b/c/drivers/sqlite/sqlite.cc
@@ -1468,6 +1468,15 @@ AdbcStatusCode SqliteConnectionNew(struct 
AdbcConnection* connection,
   return ADBC_STATUS_OK;
 }
 
+AdbcStatusCode SqliteConnectionReadPartition(struct AdbcConnection* connection,
+                                             const uint8_t* 
serialized_partition,
+                                             size_t serialized_length,
+                                             struct ArrowArrayStream* out,
+                                             struct AdbcError* error) {
+  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
 AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
                                        struct AdbcError* error) {
   if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
@@ -1529,6 +1538,15 @@ AdbcStatusCode SqliteStatementBindStream(struct 
AdbcStatement* statement,
   return (*ptr)->Bind(*ptr, stream, error);
 }
 
+AdbcStatusCode SqliteStatementExecutePartitions(struct AdbcStatement* 
statement,
+                                                struct ArrowSchema* schema,
+                                                struct AdbcPartitions* 
partitions,
+                                                int64_t* rows_affected,
+                                                struct AdbcError* error) {
+  if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
 AdbcStatusCode SqliteStatementExecuteQuery(struct AdbcStatement* statement,
                                            struct ArrowArrayStream* out,
                                            int64_t* rows_affected,
@@ -1674,6 +1692,15 @@ AdbcStatusCode AdbcConnectionNew(struct AdbcConnection* 
connection,
   return SqliteConnectionNew(connection, error);
 }
 
+AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
+                                           const uint8_t* serialized_partition,
+                                           size_t serialized_length,
+                                           struct ArrowArrayStream* out,
+                                           struct AdbcError* error) {
+  return SqliteConnectionReadPartition(connection, serialized_partition,
+                                       serialized_length, out, error);
+}
+
 AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
                                      struct AdbcError* error) {
   return SqliteConnectionRelease(connection, error);
@@ -1701,6 +1728,15 @@ AdbcStatusCode AdbcStatementBindStream(struct 
AdbcStatement* statement,
   return SqliteStatementBindStream(statement, stream, error);
 }
 
+AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
+                                              ArrowSchema* schema,
+                                              struct AdbcPartitions* 
partitions,
+                                              int64_t* rows_affected,
+                                              struct AdbcError* error) {
+  return SqliteStatementExecutePartitions(statement, schema, partitions, 
rows_affected,
+                                          error);
+}
+
 AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
                                          struct ArrowArrayStream* out,
                                          int64_t* rows_affected,
@@ -1771,12 +1807,14 @@ AdbcStatusCode AdbcDriverInit(int version, void* 
raw_driver, struct AdbcError* e
   driver->ConnectionGetTableTypes = SqliteConnectionGetTableTypes;
   driver->ConnectionInit = SqliteConnectionInit;
   driver->ConnectionNew = SqliteConnectionNew;
+  driver->ConnectionReadPartition = SqliteConnectionReadPartition;
   driver->ConnectionRelease = SqliteConnectionRelease;
   driver->ConnectionRollback = SqliteConnectionRollback;
   driver->ConnectionSetOption = SqliteConnectionSetOption;
 
   driver->StatementBind = SqliteStatementBind;
   driver->StatementBindStream = SqliteStatementBindStream;
+  driver->StatementExecutePartitions = SqliteStatementExecutePartitions;
   driver->StatementExecuteQuery = SqliteStatementExecuteQuery;
   driver->StatementGetParameterSchema = SqliteStatementGetParameterSchema;
   driver->StatementNew = SqliteStatementNew;
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index 756e40e..492ef48 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -1445,6 +1445,65 @@ void StatementTest::TestSqlIngestMultipleConnections() {
   }
 }
 
+void StatementTest::TestSqlPartitionedInts() {
+  ADBCV_ASSERT_OK(&error, AdbcStatementNew(&connection, &statement, &error));
+  ADBCV_ASSERT_OK(&error, AdbcStatementSetSqlQuery(&statement, "SELECT 42", 
&error));
+
+  struct ArrowSchema schema;
+  Handle<struct AdbcPartitions> partitions;
+  int64_t rows_affected = 0;
+
+  if (!quirks()->supports_partitioned_data()) {
+    ADBCV_ASSERT_FAILS_WITH(
+        NOT_IMPLEMENTED, &error,
+        AdbcStatementExecutePartitions(&statement, &schema, &partitions.value,
+                                       &rows_affected, &error));
+    return;
+  }
+
+  ADBCV_ASSERT_OK(&error,
+                  AdbcStatementExecutePartitions(&statement, &schema, 
&partitions.value,
+                                                 &rows_affected, &error));
+  // Assume only 1 partition
+  ASSERT_EQ(1, partitions->num_partitions);
+  ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1), 
::testing::Eq(-1)));
+  ASSERT_NE(nullptr, schema.release);
+  ASSERT_EQ(1, schema.n_children);
+
+  Handle<struct AdbcConnection> connection2;
+  StreamReader reader;
+  ADBCV_ASSERT_OK(&error, AdbcConnectionNew(&connection2.value, &error));
+  ADBCV_ASSERT_OK(&error, AdbcConnectionInit(&connection2.value, &database, 
&error));
+  ADBCV_ASSERT_OK(
+      &error, AdbcConnectionReadPartition(&connection2.value, 
partitions->partitions[0],
+                                          partitions->partition_lengths[0],
+                                          &reader.stream.value, &error));
+
+  ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+  ASSERT_EQ(1, reader.schema->n_children);
+
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NE(nullptr, reader.array->release);
+  ASSERT_EQ(1, reader.array->length);
+  ASSERT_EQ(1, reader.array->n_children);
+
+  switch (reader.fields[0].data_type) {
+    case NANOARROW_TYPE_INT32:
+      ASSERT_NO_FATAL_FAILURE(
+          CompareArray<int32_t>(reader.array_view->children[0], {42}));
+      break;
+    case NANOARROW_TYPE_INT64:
+      ASSERT_NO_FATAL_FAILURE(
+          CompareArray<int64_t>(reader.array_view->children[0], {42}));
+      break;
+    default:
+      FAIL() << "Unexpected data type: " << reader.fields[0].data_type;
+  }
+
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_EQ(nullptr, reader.array->release);
+}
+
 void StatementTest::TestSqlPrepareSelectNoParams() {
   ADBCV_ASSERT_OK(&error, AdbcStatementNew(&connection, &statement, &error));
   ADBCV_ASSERT_OK(&error, AdbcStatementSetSqlQuery(&statement, "SELECT 1", 
&error));
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index 26e5f7f..15bc5be 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -50,6 +50,9 @@ class DriverQuirks {
   /// \brief Whether two statements can be used at the same time on a
   ///   single connection
   virtual bool supports_concurrent_statements() const { return false; }
+
+  /// \brief Whether AdbcStatementExecutePartitions should work
+  virtual bool supports_partitioned_data() const { return false; }
 };
 
 class DatabaseTest {
@@ -143,6 +146,8 @@ class StatementTest {
   void TestSqlIngestErrors();
   void TestSqlIngestMultipleConnections();
 
+  void TestSqlPartitionedInts();
+
   void TestSqlPrepareSelectNoParams();
   void TestSqlPrepareSelectParams();
   void TestSqlPrepareUpdate();
@@ -178,6 +183,7 @@ class StatementTest {
   TEST_F(FIXTURE, SqlIngestAppend) { TestSqlIngestAppend(); }                  
         \
   TEST_F(FIXTURE, SqlIngestErrors) { TestSqlIngestErrors(); }                  
         \
   TEST_F(FIXTURE, SqlIngestMultipleConnections) { 
TestSqlIngestMultipleConnections(); } \
+  TEST_F(FIXTURE, SqlPartitionedInts) { TestSqlPartitionedInts(); }            
         \
   TEST_F(FIXTURE, SqlPrepareSelectNoParams) { TestSqlPrepareSelectNoParams(); 
}         \
   TEST_F(FIXTURE, SqlPrepareSelectParams) { TestSqlPrepareSelectParams(); }    
         \
   TEST_F(FIXTURE, SqlPrepareUpdate) { TestSqlPrepareUpdate(); }                
         \

Reply via email to