mbrobbel commented on code in PR #1803: URL: https://github.com/apache/arrow-adbc/pull/1803#discussion_r1592419258
########## rust/core/src/driver_manager.rs: ########## @@ -0,0 +1,1218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Load and use ADBC drivers. +//! +//! The driver manager provides an implementation of the ADBC interface which +//! uses FFI to wrap an object file implementation of +//! [`adbc.h`](https://github.com/apache/arrow-adbc/blob/main/adbc.h). +//! +//! There are three ways that drivers can be loaded: +//! 1. By statically linking the driver implementation using +//! [ManagedDriver::load_static]. +//! 2. By dynamically linking the driver implementation using +//! [ManagedDriver::load_static]. +//! 3. By loading the driver implementation at runtime (with +//! `dlopen/LoadLibrary`) using [ManagedDriver::load_dynamic]. +//! +//! Drivers are initialized using a function provided by the driver as a main +//! entrypoint, canonically called `AdbcDriverInit`. Although many will use a +//! different name to support statically linking multiple drivers within the +//! same program. +//! +//! ## Using across threads +//! +//! [ManagedDriver], [ManagedDatabase], [ManagedConnection] and [ManagedStatement] +//! can be used across threads though all of their operations are serialized +//! under the hood. They hold their inner implementations within [std::sync::Arc], +//! so they are cheaply clonable. +//! +//! ## Example +//! +//! ```rust +//! # use std::sync::Arc; +//! # use arrow::{ +//! # array::{Array, StringArray, Int64Array, Float64Array}, +//! # record_batch::{RecordBatch, RecordBatchReader}, +//! # datatypes::{Field, Schema, DataType}, +//! # compute::concat_batches, +//! # }; +//! # use adbc_core::{ +//! # driver_manager::ManagedDriver, +//! # options::{AdbcVersion, OptionDatabase, OptionStatement}, +//! # Connection, Database, Driver, Statement, Optionable +//! # }; +//! # fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let opts = [(OptionDatabase::Uri, ":memory:".into())]; +//! let mut driver = ManagedDriver::load_dynamic("adbc_driver_sqlite", None, AdbcVersion::V100)?; +//! let mut database = driver.new_database_with_opts(opts)?; +//! let mut connection = database.new_connection()?; +//! let mut statement = connection.new_statement()?; +//! +//! // Define some data. +//! # let columns: Vec<Arc<dyn Array>> = vec![ +//! # Arc::new(Int64Array::from(vec![1, 2, 3, 4])), +//! # Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])), +//! # Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), +//! # ]; +//! # let schema = Schema::new(vec![ +//! # Field::new("a", DataType::Int64, true), +//! # Field::new("b", DataType::Float64, true), +//! # Field::new("c", DataType::Utf8, true), +//! # ]); +//! let input: RecordBatch = RecordBatch::try_new(Arc::new(schema), columns)?; +//! +//! // Ingest data. +//! statement.set_option(OptionStatement::TargetTable, "my_table".into())?; +//! statement.bind(input.clone())?; +//! statement.execute_update()?; +//! +//! // Extract data. +//! statement.set_sql_query("select * from my_table")?; +//! let output = statement.execute()?; +//! let schema = output.schema(); +//! let output: Result<Vec<RecordBatch>, _> = output.collect(); +//! let output = concat_batches(&schema, &output?)?; +//! assert_eq!(input, output); +//! +//! # Ok(()) +//! # } +//! ``` + +// According to the ADBC specification, objects allow serialized access from +// multiple threads: one thread may make a call, and once finished, another +// thread may make a call. They do not allow concurrent access from multiple +// threads. +// +// In order to implement this semantics, all FFI objects are wrapped into +// `Mutex`. Hence, we need to deal with multiple locks at once, so care must +// be taken to avoid deadlock and in particular we must avoid "lock inversion". +// The general convention chosen here is to first acquire lock to the driver +// and then acquire lock to the specific object under implementation. + +use std::collections::HashSet; +use std::ffi::{CStr, CString}; +use std::ops::DerefMut; +use std::os::raw::{c_char, c_void}; +use std::ptr::{null, null_mut}; +use std::sync::{Arc, Mutex}; + +use arrow::array::{Array, RecordBatch, RecordBatchReader, StructArray}; +use arrow::ffi::{to_ffi, FFI_ArrowSchema}; +use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; + +use crate::{ + error::{Error, Status}, + options::{self, AdbcVersion, InfoCode, OptionValue}, + PartitionedResult, Result, +}; +use crate::{ffi, ffi::types::driver_method, Optionable}; +use crate::{Connection, Database, Driver, Statement}; + +const ERR_ONLY_STRING_OPT: &str = "Only string option value are supported with ADBC 1.0.0"; +const ERR_CANCEL_UNSUPPORTED: &str = + "Canceling connection or statement is not supported with ADBC 1.0.0"; +const ERR_STATISTICS_UNSUPPORTED: &str = "Statistics are not supported with ADBC 1.0.0"; + +fn check_status(status: ffi::FFI_AdbcStatusCode, error: ffi::FFI_AdbcError) -> Result<()> { + match status { + ffi::constants::ADBC_STATUS_OK => Ok(()), + _ => { + let mut error: Error = error.try_into()?; + error.status = status.try_into()?; + Err(error) + } + } +} + +impl From<libloading::Error> for Error { + fn from(value: libloading::Error) -> Self { + Self { + message: format!("Error with dynamic library: {value}"), + status: Status::Internal, + vendor_code: 0, + sqlstate: [0; 5], + details: None, + } + } +} + +struct DriverManagerInner { + driver: Mutex<ffi::FFI_AdbcDriver>, + version: AdbcVersion, // Driver version + _library: Option<libloading::Library>, Review Comment: Maybe we can make this explicit by unloading (https://docs.rs/libloading/latest/libloading/struct.Library.html#method.close) the library in the Drop impl? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
