This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 16ceb85ae feat(rust)!: return `Box<RecordBatchReader + 'static>` for
caller flexibility (#3904)
16ceb85ae is described below
commit 16ceb85aeb20e0cce94d5a1a150a7bb2af763cbd
Author: David Li <[email protected]>
AuthorDate: Tue Feb 10 08:59:54 2026 +0900
feat(rust)!: return `Box<RecordBatchReader + 'static>` for caller
flexibility (#3904)
This avoids inadvertently tying the result lifetime to the lifetimes of
any input arguments, and fully type-erases the result.
Closes #2694.
---
rust/core/src/sync.rs | 20 +++++++++----------
rust/driver/datafusion/src/lib.rs | 23 ++++++++++++----------
rust/driver/dummy/src/lib.rs | 34 +++++++++++++++++++--------------
rust/driver/snowflake/src/connection.rs | 18 +++++++++++------
rust/driver/snowflake/src/statement.rs | 2 +-
rust/driver_manager/src/lib.rs | 34 +++++++++++++++++++--------------
6 files changed, 76 insertions(+), 55 deletions(-)
diff --git a/rust/core/src/sync.rs b/rust/core/src/sync.rs
index 400eae998..de26918b9 100644
--- a/rust/core/src/sync.rs
+++ b/rust/core/src/sync.rs
@@ -125,7 +125,7 @@ pub trait Connection: Optionable<Option = OptionConnection>
{
fn get_info(
&self,
codes: Option<HashSet<options::InfoCode>>,
- ) -> Result<impl RecordBatchReader + Send>;
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;
/// Get a hierarchical view of all catalogs, database schemas, tables, and
/// columns.
@@ -233,7 +233,7 @@ pub trait Connection: Optionable<Option = OptionConnection>
{
table_name: Option<&str>,
table_type: Option<Vec<&str>>,
column_name: Option<&str>,
- ) -> Result<impl RecordBatchReader + Send>;
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;
/// Get the Arrow schema of a table.
///
@@ -258,7 +258,7 @@ pub trait Connection: Optionable<Option = OptionConnection>
{
/// Field Name | Field Type
/// ---------------|--------------
/// table_type | utf8 not null
- fn get_table_types(&self) -> Result<impl RecordBatchReader + Send>;
+ fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send +
'static>>;
/// Get the names of statistics specific to this driver.
///
@@ -273,7 +273,7 @@ pub trait Connection: Optionable<Option = OptionConnection>
{
///
/// # Since
/// ADBC API revision 1.1.0
- fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send>;
+ fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send +
'static>>;
/// Get statistics about the data distribution of table(s).
///
@@ -339,7 +339,7 @@ pub trait Connection: Optionable<Option = OptionConnection>
{
db_schema: Option<&str>,
table_name: Option<&str>,
approximate: bool,
- ) -> Result<impl RecordBatchReader + Send>;
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;
/// Commit any pending transactions. Only used if autocommit is disabled.
///
@@ -358,7 +358,10 @@ pub trait Connection: Optionable<Option =
OptionConnection> {
/// # Arguments
///
/// - `partition` - The partition descriptor.
- fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result<impl
RecordBatchReader + Send>;
+ fn read_partition(
+ &self,
+ partition: impl AsRef<[u8]>,
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;
}
/// A handle to an ADBC statement.
@@ -391,10 +394,7 @@ pub trait Statement: Optionable<Option = OptionStatement> {
/// Execute a statement and get the results.
///
/// This invalidates any prior result sets.
- // TODO(alexandreyc): is the Send bound absolutely necessary? same question
- // for all methods that return an impl RecordBatchReader
- // See:
https://github.com/apache/arrow-adbc/pull/1725#discussion_r1567748242
- fn execute(&mut self) -> Result<impl RecordBatchReader + Send>;
+ fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send +
'static>>;
/// Execute a statement that doesn’t have a result set and get the number
/// of affected rows.
diff --git a/rust/driver/datafusion/src/lib.rs
b/rust/driver/datafusion/src/lib.rs
index c08ba28c7..0db317a72 100644
--- a/rust/driver/datafusion/src/lib.rs
+++ b/rust/driver/datafusion/src/lib.rs
@@ -742,7 +742,7 @@ impl Connection for DataFusionConnection {
fn get_info(
&self,
codes: Option<std::collections::HashSet<adbc_core::options::InfoCode>>,
- ) -> Result<impl RecordBatchReader + Send> {
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let mut get_info_builder = GetInfoBuilder::new();
codes.unwrap().into_iter().for_each(|f| match f {
@@ -755,7 +755,7 @@ impl Connection for DataFusionConnection {
let batch = get_info_builder.finish()?;
let reader = SingleBatchReader::new(batch);
- Ok(reader)
+ Ok(Box::new(reader))
}
fn get_objects(
@@ -766,10 +766,10 @@ impl Connection for DataFusionConnection {
_table_name: Option<&str>,
_table_type: Option<Vec<&str>>,
_column_name: Option<&str>,
- ) -> Result<impl RecordBatchReader + Send> {
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let batch = GetObjectsBuilder::new().build(&self.runtime, &self.ctx,
&depth)?;
let reader = SingleBatchReader::new(batch);
- Ok(reader)
+ Ok(Box::new(reader))
}
fn get_table_schema(
@@ -781,11 +781,11 @@ impl Connection for DataFusionConnection {
todo!()
}
- fn get_table_types(&self) -> Result<SingleBatchReader> {
+ fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send +
'static>> {
todo!()
}
- fn get_statistic_names(&self) -> Result<SingleBatchReader> {
+ fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send +
'static>> {
todo!()
}
@@ -795,7 +795,7 @@ impl Connection for DataFusionConnection {
_db_schema: Option<&str>,
_table_name: Option<&str>,
_approximate: bool,
- ) -> Result<SingleBatchReader> {
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
todo!()
}
@@ -807,7 +807,10 @@ impl Connection for DataFusionConnection {
todo!()
}
- fn read_partition(&self, _partition: impl AsRef<[u8]>) ->
Result<SingleBatchReader> {
+ fn read_partition(
+ &self,
+ _partition: impl AsRef<[u8]>,
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
todo!()
}
}
@@ -901,7 +904,7 @@ impl Statement for DataFusionStatement {
todo!()
}
- fn execute(&mut self) -> Result<impl RecordBatchReader + Send> {
+ fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send>> {
self.runtime.block_on(async {
let df = if self.sql_query.is_some() {
self.ctx
@@ -916,7 +919,7 @@ impl Statement for DataFusionStatement {
self.ctx.execute_logical_plan(plan).await.unwrap()
};
- Ok(DataFusionReader::new(df).await)
+ Ok(Box::new(DataFusionReader::new(df).await) as Box<dyn
RecordBatchReader + Send>)
})
}
diff --git a/rust/driver/dummy/src/lib.rs b/rust/driver/dummy/src/lib.rs
index a6e9e0eb7..72c81f178 100644
--- a/rust/driver/dummy/src/lib.rs
+++ b/rust/driver/dummy/src/lib.rs
@@ -310,7 +310,10 @@ impl Connection for DummyConnection {
Ok(())
}
- fn get_info(&self, _codes: Option<HashSet<InfoCode>>) -> Result<impl
RecordBatchReader> {
+ fn get_info(
+ &self,
+ _codes: Option<HashSet<InfoCode>>,
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let string_value_array = StringArray::from(vec!["MyVendorName"]);
let bool_value_array = BooleanArray::from(vec![true]);
let int64_value_array = Int64Array::from(vec![42]);
@@ -407,7 +410,7 @@ impl Connection for DummyConnection {
vec![Arc::new(name_array), Arc::new(value_array)],
)?;
let reader = SingleBatchReader::new(batch);
- Ok(reader)
+ Ok(Box::new(reader))
}
fn get_objects(
@@ -418,7 +421,7 @@ impl Connection for DummyConnection {
_table_name: Option<&str>,
_table_type: Option<Vec<&str>>,
_column_name: Option<&str>,
- ) -> Result<impl RecordBatchReader> {
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let constraint_column_usage_array_inner = StructArray::from(vec![
(
Arc::new(Field::new("fk_catalog", DataType::Utf8, true)),
@@ -645,7 +648,7 @@ impl Connection for DummyConnection {
],
)?;
let reader = SingleBatchReader::new(batch);
- Ok(reader)
+ Ok(Box::new(reader))
}
fn get_statistics(
@@ -654,7 +657,7 @@ impl Connection for DummyConnection {
_db_schema: Option<&str>,
_table_name: Option<&str>,
_approximate: bool,
- ) -> Result<impl RecordBatchReader> {
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let statistic_value_int64_array = Int64Array::from(Vec::<i64>::new());
let statistic_value_uint64_array = UInt64Array::from(vec![42]);
let statistic_value_float64_array =
Float64Array::from(Vec::<f64>::new());
@@ -759,10 +762,10 @@ impl Connection for DummyConnection {
)?;
let reader = SingleBatchReader::new(batch);
- Ok(reader)
+ Ok(Box::new(reader))
}
- fn get_statistic_names(&self) -> Result<impl RecordBatchReader> {
+ fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send +
'static>> {
let name_array = StringArray::from(vec!["sum", "min", "max"]);
let key_array = Int16Array::from(vec![0, 1, 2]);
let batch = RecordBatch::try_new(
@@ -770,7 +773,7 @@ impl Connection for DummyConnection {
vec![Arc::new(name_array), Arc::new(key_array)],
)?;
let reader = SingleBatchReader::new(batch);
- Ok(reader)
+ Ok(Box::new(reader))
}
fn get_table_schema(
@@ -792,17 +795,20 @@ impl Connection for DummyConnection {
}
}
- fn get_table_types(&self) -> Result<impl RecordBatchReader> {
+ fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send +
'static>> {
let array = Arc::new(StringArray::from(vec!["table", "view"]));
let batch =
RecordBatch::try_new(schemas::GET_TABLE_TYPES_SCHEMA.clone(), vec![array])?;
let reader = SingleBatchReader::new(batch);
- Ok(reader)
+ Ok(Box::new(reader))
}
- fn read_partition(&self, _partition: impl AsRef<[u8]>) -> Result<impl
RecordBatchReader> {
+ fn read_partition(
+ &self,
+ _partition: impl AsRef<[u8]>,
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let batch = get_table_data();
let reader = SingleBatchReader::new(batch);
- Ok(reader)
+ Ok(Box::new(reader))
}
fn rollback(&mut self) -> Result<()> {
@@ -852,11 +858,11 @@ impl Statement for DummyStatement {
Ok(())
}
- fn execute(&mut self) -> Result<impl RecordBatchReader> {
+ fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send +
'static>> {
maybe_panic("StatementExecuteQuery");
let batch = get_table_data();
let reader = SingleBatchReader::new(batch);
- Ok(reader)
+ Ok(Box::new(reader))
}
fn execute_partitions(&mut self) -> Result<PartitionedResult> {
diff --git a/rust/driver/snowflake/src/connection.rs
b/rust/driver/snowflake/src/connection.rs
index cc0ba51d9..2e916ac4b 100644
--- a/rust/driver/snowflake/src/connection.rs
+++ b/rust/driver/snowflake/src/connection.rs
@@ -74,7 +74,10 @@ impl adbc_core::Connection for Connection {
self.0.cancel()
}
- fn get_info(&self, codes: Option<HashSet<InfoCode>>) -> Result<impl
RecordBatchReader + Send> {
+ fn get_info(
+ &self,
+ codes: Option<HashSet<InfoCode>>,
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
self.0.get_info(codes)
}
@@ -86,7 +89,7 @@ impl adbc_core::Connection for Connection {
table_name: Option<&str>,
table_type: Option<Vec<&str>>,
column_name: Option<&str>,
- ) -> Result<impl RecordBatchReader + Send> {
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
self.0.get_objects(
depth,
catalog,
@@ -106,11 +109,11 @@ impl adbc_core::Connection for Connection {
self.0.get_table_schema(catalog, db_schema, table_name)
}
- fn get_table_types(&self) -> Result<impl RecordBatchReader + Send> {
+ fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send +
'static>> {
self.0.get_table_types()
}
- fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send> {
+ fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send +
'static>> {
self.0.get_statistic_names()
}
@@ -120,7 +123,7 @@ impl adbc_core::Connection for Connection {
db_schema: Option<&str>,
table_name: Option<&str>,
approximate: bool,
- ) -> Result<impl RecordBatchReader + Send> {
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
self.0
.get_statistics(catalog, db_schema, table_name, approximate)
}
@@ -133,7 +136,10 @@ impl adbc_core::Connection for Connection {
self.0.rollback()
}
- fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result<impl
RecordBatchReader + Send> {
+ fn read_partition(
+ &self,
+ partition: impl AsRef<[u8]>,
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
self.0.read_partition(partition)
}
}
diff --git a/rust/driver/snowflake/src/statement.rs
b/rust/driver/snowflake/src/statement.rs
index eb41d1108..f956852b2 100644
--- a/rust/driver/snowflake/src/statement.rs
+++ b/rust/driver/snowflake/src/statement.rs
@@ -64,7 +64,7 @@ impl adbc_core::Statement for Statement {
self.0.bind_stream(reader)
}
- fn execute(&mut self) -> Result<impl RecordBatchReader + Send> {
+ fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send +
'static>> {
self.0.execute()
}
diff --git a/rust/driver_manager/src/lib.rs b/rust/driver_manager/src/lib.rs
index 36389b47d..b2cc03f3a 100644
--- a/rust/driver_manager/src/lib.rs
+++ b/rust/driver_manager/src/lib.rs
@@ -1158,7 +1158,10 @@ impl Connection for ManagedConnection {
check_status(status, error)
}
- fn get_info(&self, codes: Option<HashSet<InfoCode>>) -> Result<impl
RecordBatchReader> {
+ fn get_info(
+ &self,
+ codes: Option<HashSet<InfoCode>>,
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let mut stream = FFI_ArrowArrayStream::empty();
let codes: Option<Vec<u32>> =
codes.map(|codes| codes.iter().map(|code| code.into()).collect());
@@ -1181,7 +1184,7 @@ impl Connection for ManagedConnection {
};
check_status(status, error)?;
let reader = ArrowArrayStreamReader::try_new(stream)?;
- Ok(reader)
+ Ok(Box::new(reader))
}
fn get_objects(
@@ -1192,7 +1195,7 @@ impl Connection for ManagedConnection {
table_name: Option<&str>,
table_type: Option<Vec<&str>>,
column_name: Option<&str>,
- ) -> Result<impl RecordBatchReader> {
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let catalog = catalog.map(CString::new).transpose()?;
let db_schema = db_schema.map(CString::new).transpose()?;
let table_name = table_name.map(CString::new).transpose()?;
@@ -1244,7 +1247,7 @@ impl Connection for ManagedConnection {
check_status(status, error)?;
let reader = ArrowArrayStreamReader::try_new(stream)?;
- Ok(reader)
+ Ok(Box::new(reader))
}
fn get_statistics(
@@ -1253,7 +1256,7 @@ impl Connection for ManagedConnection {
db_schema: Option<&str>,
table_name: Option<&str>,
approximate: bool,
- ) -> Result<impl RecordBatchReader> {
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
if let AdbcVersion::V100 = self.driver_version() {
return Err(Error::with_message_and_status(
ERR_STATISTICS_UNSUPPORTED,
@@ -1287,10 +1290,10 @@ impl Connection for ManagedConnection {
};
check_status(status, error)?;
let reader = ArrowArrayStreamReader::try_new(stream)?;
- Ok(reader)
+ Ok(Box::new(reader))
}
- fn get_statistic_names(&self) -> Result<impl RecordBatchReader> {
+ fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send +
'static>> {
if let AdbcVersion::V100 = self.driver_version() {
return Err(Error::with_message_and_status(
ERR_STATISTICS_UNSUPPORTED,
@@ -1305,7 +1308,7 @@ impl Connection for ManagedConnection {
let status = unsafe { method(connection.deref_mut(), &mut stream, &mut
error) };
check_status(status, error)?;
let reader = ArrowArrayStreamReader::try_new(stream)?;
- Ok(reader)
+ Ok(Box::new(reader))
}
fn get_table_schema(
@@ -1341,7 +1344,7 @@ impl Connection for ManagedConnection {
Ok((&schema).try_into()?)
}
- fn get_table_types(&self) -> Result<impl RecordBatchReader> {
+ fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send +
'static>> {
let mut stream = FFI_ArrowArrayStream::empty();
let driver = self.ffi_driver();
let mut connection = self.inner.connection.lock().unwrap();
@@ -1350,10 +1353,13 @@ impl Connection for ManagedConnection {
let status = unsafe { method(connection.deref_mut(), &mut stream, &mut
error) };
check_status(status, error)?;
let reader = ArrowArrayStreamReader::try_new(stream)?;
- Ok(reader)
+ Ok(Box::new(reader))
}
- fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result<impl
RecordBatchReader> {
+ fn read_partition(
+ &self,
+ partition: impl AsRef<[u8]>,
+ ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
let mut stream = FFI_ArrowArrayStream::empty();
let driver = self.ffi_driver();
let mut connection = self.inner.connection.lock().unwrap();
@@ -1371,7 +1377,7 @@ impl Connection for ManagedConnection {
};
check_status(status, error)?;
let reader = ArrowArrayStreamReader::try_new(stream)?;
- Ok(reader)
+ Ok(Box::new(reader))
}
}
@@ -1434,7 +1440,7 @@ impl Statement for ManagedStatement {
check_status(status, error)
}
- fn execute(&mut self) -> Result<impl RecordBatchReader> {
+ fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send +
'static>> {
let driver = self.ffi_driver();
let mut statement = self.inner.statement.lock().unwrap();
let mut error = adbc_ffi::FFI_AdbcError::with_driver(driver);
@@ -1443,7 +1449,7 @@ impl Statement for ManagedStatement {
let status = unsafe { method(statement.deref_mut(), &mut stream,
null_mut(), &mut error) };
check_status(status, error)?;
let reader = ArrowArrayStreamReader::try_new(stream)?;
- Ok(reader)
+ Ok(Box::new(reader))
}
fn execute_schema(&mut self) -> Result<arrow_schema::Schema> {