This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 16ceb85ae feat(rust)!: return `Box<RecordBatchReader + 'static>` for 
caller flexibility (#3904)
16ceb85ae is described below

commit 16ceb85aeb20e0cce94d5a1a150a7bb2af763cbd
Author: David Li <[email protected]>
AuthorDate: Tue Feb 10 08:59:54 2026 +0900

    feat(rust)!: return `Box<RecordBatchReader + 'static>` for caller 
flexibility (#3904)
    
    This avoids inadvertently tying the result lifetime to the lifetimes of
    any input arguments, and fully type-erases the result.
    
    Closes #2694.
---
 rust/core/src/sync.rs                   | 20 +++++++++----------
 rust/driver/datafusion/src/lib.rs       | 23 ++++++++++++----------
 rust/driver/dummy/src/lib.rs            | 34 +++++++++++++++++++--------------
 rust/driver/snowflake/src/connection.rs | 18 +++++++++++------
 rust/driver/snowflake/src/statement.rs  |  2 +-
 rust/driver_manager/src/lib.rs          | 34 +++++++++++++++++++--------------
 6 files changed, 76 insertions(+), 55 deletions(-)

diff --git a/rust/core/src/sync.rs b/rust/core/src/sync.rs
index 400eae998..de26918b9 100644
--- a/rust/core/src/sync.rs
+++ b/rust/core/src/sync.rs
@@ -125,7 +125,7 @@ pub trait Connection: Optionable<Option = OptionConnection> 
{
     fn get_info(
         &self,
         codes: Option<HashSet<options::InfoCode>>,
-    ) -> Result<impl RecordBatchReader + Send>;
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;
 
     /// Get a hierarchical view of all catalogs, database schemas, tables, and
     /// columns.
@@ -233,7 +233,7 @@ pub trait Connection: Optionable<Option = OptionConnection> 
{
         table_name: Option<&str>,
         table_type: Option<Vec<&str>>,
         column_name: Option<&str>,
-    ) -> Result<impl RecordBatchReader + Send>;
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;
 
     /// Get the Arrow schema of a table.
     ///
@@ -258,7 +258,7 @@ pub trait Connection: Optionable<Option = OptionConnection> 
{
     /// Field Name     | Field Type
     /// ---------------|--------------
     /// table_type     | utf8 not null
-    fn get_table_types(&self) -> Result<impl RecordBatchReader + Send>;
+    fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>>;
 
     /// Get the names of statistics specific to this driver.
     ///
@@ -273,7 +273,7 @@ pub trait Connection: Optionable<Option = OptionConnection> 
{
     ///
     /// # Since
     /// ADBC API revision 1.1.0
-    fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send>;
+    fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>>;
 
     /// Get statistics about the data distribution of table(s).
     ///
@@ -339,7 +339,7 @@ pub trait Connection: Optionable<Option = OptionConnection> 
{
         db_schema: Option<&str>,
         table_name: Option<&str>,
         approximate: bool,
-    ) -> Result<impl RecordBatchReader + Send>;
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;
 
     /// Commit any pending transactions. Only used if autocommit is disabled.
     ///
@@ -358,7 +358,10 @@ pub trait Connection: Optionable<Option = 
OptionConnection> {
     /// # Arguments
     ///
     /// - `partition` - The partition descriptor.
-    fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result<impl 
RecordBatchReader + Send>;
+    fn read_partition(
+        &self,
+        partition: impl AsRef<[u8]>,
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>>;
 }
 
 /// A handle to an ADBC statement.
@@ -391,10 +394,7 @@ pub trait Statement: Optionable<Option = OptionStatement> {
     /// Execute a statement and get the results.
     ///
     /// This invalidates any prior result sets.
-    // TODO(alexandreyc): is the Send bound absolutely necessary? same question
-    // for all methods that return an impl RecordBatchReader
-    // See: 
https://github.com/apache/arrow-adbc/pull/1725#discussion_r1567748242
-    fn execute(&mut self) -> Result<impl RecordBatchReader + Send>;
+    fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>>;
 
     /// Execute a statement that doesn’t have a result set and get the number
     /// of affected rows.
diff --git a/rust/driver/datafusion/src/lib.rs 
b/rust/driver/datafusion/src/lib.rs
index c08ba28c7..0db317a72 100644
--- a/rust/driver/datafusion/src/lib.rs
+++ b/rust/driver/datafusion/src/lib.rs
@@ -742,7 +742,7 @@ impl Connection for DataFusionConnection {
     fn get_info(
         &self,
         codes: Option<std::collections::HashSet<adbc_core::options::InfoCode>>,
-    ) -> Result<impl RecordBatchReader + Send> {
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         let mut get_info_builder = GetInfoBuilder::new();
 
         codes.unwrap().into_iter().for_each(|f| match f {
@@ -755,7 +755,7 @@ impl Connection for DataFusionConnection {
 
         let batch = get_info_builder.finish()?;
         let reader = SingleBatchReader::new(batch);
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
     fn get_objects(
@@ -766,10 +766,10 @@ impl Connection for DataFusionConnection {
         _table_name: Option<&str>,
         _table_type: Option<Vec<&str>>,
         _column_name: Option<&str>,
-    ) -> Result<impl RecordBatchReader + Send> {
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         let batch = GetObjectsBuilder::new().build(&self.runtime, &self.ctx, 
&depth)?;
         let reader = SingleBatchReader::new(batch);
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
     fn get_table_schema(
@@ -781,11 +781,11 @@ impl Connection for DataFusionConnection {
         todo!()
     }
 
-    fn get_table_types(&self) -> Result<SingleBatchReader> {
+    fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>> {
         todo!()
     }
 
-    fn get_statistic_names(&self) -> Result<SingleBatchReader> {
+    fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>> {
         todo!()
     }
 
@@ -795,7 +795,7 @@ impl Connection for DataFusionConnection {
         _db_schema: Option<&str>,
         _table_name: Option<&str>,
         _approximate: bool,
-    ) -> Result<SingleBatchReader> {
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         todo!()
     }
 
@@ -807,7 +807,10 @@ impl Connection for DataFusionConnection {
         todo!()
     }
 
-    fn read_partition(&self, _partition: impl AsRef<[u8]>) -> 
Result<SingleBatchReader> {
+    fn read_partition(
+        &self,
+        _partition: impl AsRef<[u8]>,
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         todo!()
     }
 }
@@ -901,7 +904,7 @@ impl Statement for DataFusionStatement {
         todo!()
     }
 
-    fn execute(&mut self) -> Result<impl RecordBatchReader + Send> {
+    fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send>> {
         self.runtime.block_on(async {
             let df = if self.sql_query.is_some() {
                 self.ctx
@@ -916,7 +919,7 @@ impl Statement for DataFusionStatement {
                 self.ctx.execute_logical_plan(plan).await.unwrap()
             };
 
-            Ok(DataFusionReader::new(df).await)
+            Ok(Box::new(DataFusionReader::new(df).await) as Box<dyn 
RecordBatchReader + Send>)
         })
     }
 
diff --git a/rust/driver/dummy/src/lib.rs b/rust/driver/dummy/src/lib.rs
index a6e9e0eb7..72c81f178 100644
--- a/rust/driver/dummy/src/lib.rs
+++ b/rust/driver/dummy/src/lib.rs
@@ -310,7 +310,10 @@ impl Connection for DummyConnection {
         Ok(())
     }
 
-    fn get_info(&self, _codes: Option<HashSet<InfoCode>>) -> Result<impl 
RecordBatchReader> {
+    fn get_info(
+        &self,
+        _codes: Option<HashSet<InfoCode>>,
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         let string_value_array = StringArray::from(vec!["MyVendorName"]);
         let bool_value_array = BooleanArray::from(vec![true]);
         let int64_value_array = Int64Array::from(vec![42]);
@@ -407,7 +410,7 @@ impl Connection for DummyConnection {
             vec![Arc::new(name_array), Arc::new(value_array)],
         )?;
         let reader = SingleBatchReader::new(batch);
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
     fn get_objects(
@@ -418,7 +421,7 @@ impl Connection for DummyConnection {
         _table_name: Option<&str>,
         _table_type: Option<Vec<&str>>,
         _column_name: Option<&str>,
-    ) -> Result<impl RecordBatchReader> {
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         let constraint_column_usage_array_inner = StructArray::from(vec![
             (
                 Arc::new(Field::new("fk_catalog", DataType::Utf8, true)),
@@ -645,7 +648,7 @@ impl Connection for DummyConnection {
             ],
         )?;
         let reader = SingleBatchReader::new(batch);
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
     fn get_statistics(
@@ -654,7 +657,7 @@ impl Connection for DummyConnection {
         _db_schema: Option<&str>,
         _table_name: Option<&str>,
         _approximate: bool,
-    ) -> Result<impl RecordBatchReader> {
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         let statistic_value_int64_array = Int64Array::from(Vec::<i64>::new());
         let statistic_value_uint64_array = UInt64Array::from(vec![42]);
         let statistic_value_float64_array = 
Float64Array::from(Vec::<f64>::new());
@@ -759,10 +762,10 @@ impl Connection for DummyConnection {
         )?;
 
         let reader = SingleBatchReader::new(batch);
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
-    fn get_statistic_names(&self) -> Result<impl RecordBatchReader> {
+    fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>> {
         let name_array = StringArray::from(vec!["sum", "min", "max"]);
         let key_array = Int16Array::from(vec![0, 1, 2]);
         let batch = RecordBatch::try_new(
@@ -770,7 +773,7 @@ impl Connection for DummyConnection {
             vec![Arc::new(name_array), Arc::new(key_array)],
         )?;
         let reader = SingleBatchReader::new(batch);
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
     fn get_table_schema(
@@ -792,17 +795,20 @@ impl Connection for DummyConnection {
         }
     }
 
-    fn get_table_types(&self) -> Result<impl RecordBatchReader> {
+    fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>> {
         let array = Arc::new(StringArray::from(vec!["table", "view"]));
         let batch = 
RecordBatch::try_new(schemas::GET_TABLE_TYPES_SCHEMA.clone(), vec![array])?;
         let reader = SingleBatchReader::new(batch);
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
-    fn read_partition(&self, _partition: impl AsRef<[u8]>) -> Result<impl 
RecordBatchReader> {
+    fn read_partition(
+        &self,
+        _partition: impl AsRef<[u8]>,
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         let batch = get_table_data();
         let reader = SingleBatchReader::new(batch);
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
     fn rollback(&mut self) -> Result<()> {
@@ -852,11 +858,11 @@ impl Statement for DummyStatement {
         Ok(())
     }
 
-    fn execute(&mut self) -> Result<impl RecordBatchReader> {
+    fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>> {
         maybe_panic("StatementExecuteQuery");
         let batch = get_table_data();
         let reader = SingleBatchReader::new(batch);
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
     fn execute_partitions(&mut self) -> Result<PartitionedResult> {
diff --git a/rust/driver/snowflake/src/connection.rs 
b/rust/driver/snowflake/src/connection.rs
index cc0ba51d9..2e916ac4b 100644
--- a/rust/driver/snowflake/src/connection.rs
+++ b/rust/driver/snowflake/src/connection.rs
@@ -74,7 +74,10 @@ impl adbc_core::Connection for Connection {
         self.0.cancel()
     }
 
-    fn get_info(&self, codes: Option<HashSet<InfoCode>>) -> Result<impl 
RecordBatchReader + Send> {
+    fn get_info(
+        &self,
+        codes: Option<HashSet<InfoCode>>,
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         self.0.get_info(codes)
     }
 
@@ -86,7 +89,7 @@ impl adbc_core::Connection for Connection {
         table_name: Option<&str>,
         table_type: Option<Vec<&str>>,
         column_name: Option<&str>,
-    ) -> Result<impl RecordBatchReader + Send> {
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         self.0.get_objects(
             depth,
             catalog,
@@ -106,11 +109,11 @@ impl adbc_core::Connection for Connection {
         self.0.get_table_schema(catalog, db_schema, table_name)
     }
 
-    fn get_table_types(&self) -> Result<impl RecordBatchReader + Send> {
+    fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>> {
         self.0.get_table_types()
     }
 
-    fn get_statistic_names(&self) -> Result<impl RecordBatchReader + Send> {
+    fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>> {
         self.0.get_statistic_names()
     }
 
@@ -120,7 +123,7 @@ impl adbc_core::Connection for Connection {
         db_schema: Option<&str>,
         table_name: Option<&str>,
         approximate: bool,
-    ) -> Result<impl RecordBatchReader + Send> {
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         self.0
             .get_statistics(catalog, db_schema, table_name, approximate)
     }
@@ -133,7 +136,10 @@ impl adbc_core::Connection for Connection {
         self.0.rollback()
     }
 
-    fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result<impl 
RecordBatchReader + Send> {
+    fn read_partition(
+        &self,
+        partition: impl AsRef<[u8]>,
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         self.0.read_partition(partition)
     }
 }
diff --git a/rust/driver/snowflake/src/statement.rs 
b/rust/driver/snowflake/src/statement.rs
index eb41d1108..f956852b2 100644
--- a/rust/driver/snowflake/src/statement.rs
+++ b/rust/driver/snowflake/src/statement.rs
@@ -64,7 +64,7 @@ impl adbc_core::Statement for Statement {
         self.0.bind_stream(reader)
     }
 
-    fn execute(&mut self) -> Result<impl RecordBatchReader + Send> {
+    fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>> {
         self.0.execute()
     }
 
diff --git a/rust/driver_manager/src/lib.rs b/rust/driver_manager/src/lib.rs
index 36389b47d..b2cc03f3a 100644
--- a/rust/driver_manager/src/lib.rs
+++ b/rust/driver_manager/src/lib.rs
@@ -1158,7 +1158,10 @@ impl Connection for ManagedConnection {
         check_status(status, error)
     }
 
-    fn get_info(&self, codes: Option<HashSet<InfoCode>>) -> Result<impl 
RecordBatchReader> {
+    fn get_info(
+        &self,
+        codes: Option<HashSet<InfoCode>>,
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         let mut stream = FFI_ArrowArrayStream::empty();
         let codes: Option<Vec<u32>> =
             codes.map(|codes| codes.iter().map(|code| code.into()).collect());
@@ -1181,7 +1184,7 @@ impl Connection for ManagedConnection {
         };
         check_status(status, error)?;
         let reader = ArrowArrayStreamReader::try_new(stream)?;
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
     fn get_objects(
@@ -1192,7 +1195,7 @@ impl Connection for ManagedConnection {
         table_name: Option<&str>,
         table_type: Option<Vec<&str>>,
         column_name: Option<&str>,
-    ) -> Result<impl RecordBatchReader> {
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         let catalog = catalog.map(CString::new).transpose()?;
         let db_schema = db_schema.map(CString::new).transpose()?;
         let table_name = table_name.map(CString::new).transpose()?;
@@ -1244,7 +1247,7 @@ impl Connection for ManagedConnection {
         check_status(status, error)?;
 
         let reader = ArrowArrayStreamReader::try_new(stream)?;
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
     fn get_statistics(
@@ -1253,7 +1256,7 @@ impl Connection for ManagedConnection {
         db_schema: Option<&str>,
         table_name: Option<&str>,
         approximate: bool,
-    ) -> Result<impl RecordBatchReader> {
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         if let AdbcVersion::V100 = self.driver_version() {
             return Err(Error::with_message_and_status(
                 ERR_STATISTICS_UNSUPPORTED,
@@ -1287,10 +1290,10 @@ impl Connection for ManagedConnection {
         };
         check_status(status, error)?;
         let reader = ArrowArrayStreamReader::try_new(stream)?;
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
-    fn get_statistic_names(&self) -> Result<impl RecordBatchReader> {
+    fn get_statistic_names(&self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>> {
         if let AdbcVersion::V100 = self.driver_version() {
             return Err(Error::with_message_and_status(
                 ERR_STATISTICS_UNSUPPORTED,
@@ -1305,7 +1308,7 @@ impl Connection for ManagedConnection {
         let status = unsafe { method(connection.deref_mut(), &mut stream, &mut 
error) };
         check_status(status, error)?;
         let reader = ArrowArrayStreamReader::try_new(stream)?;
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
     fn get_table_schema(
@@ -1341,7 +1344,7 @@ impl Connection for ManagedConnection {
         Ok((&schema).try_into()?)
     }
 
-    fn get_table_types(&self) -> Result<impl RecordBatchReader> {
+    fn get_table_types(&self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>> {
         let mut stream = FFI_ArrowArrayStream::empty();
         let driver = self.ffi_driver();
         let mut connection = self.inner.connection.lock().unwrap();
@@ -1350,10 +1353,13 @@ impl Connection for ManagedConnection {
         let status = unsafe { method(connection.deref_mut(), &mut stream, &mut 
error) };
         check_status(status, error)?;
         let reader = ArrowArrayStreamReader::try_new(stream)?;
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
-    fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result<impl 
RecordBatchReader> {
+    fn read_partition(
+        &self,
+        partition: impl AsRef<[u8]>,
+    ) -> Result<Box<dyn RecordBatchReader + Send + 'static>> {
         let mut stream = FFI_ArrowArrayStream::empty();
         let driver = self.ffi_driver();
         let mut connection = self.inner.connection.lock().unwrap();
@@ -1371,7 +1377,7 @@ impl Connection for ManagedConnection {
         };
         check_status(status, error)?;
         let reader = ArrowArrayStreamReader::try_new(stream)?;
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 }
 
@@ -1434,7 +1440,7 @@ impl Statement for ManagedStatement {
         check_status(status, error)
     }
 
-    fn execute(&mut self) -> Result<impl RecordBatchReader> {
+    fn execute(&mut self) -> Result<Box<dyn RecordBatchReader + Send + 
'static>> {
         let driver = self.ffi_driver();
         let mut statement = self.inner.statement.lock().unwrap();
         let mut error = adbc_ffi::FFI_AdbcError::with_driver(driver);
@@ -1443,7 +1449,7 @@ impl Statement for ManagedStatement {
         let status = unsafe { method(statement.deref_mut(), &mut stream, 
null_mut(), &mut error) };
         check_status(status, error)?;
         let reader = ArrowArrayStreamReader::try_new(stream)?;
-        Ok(reader)
+        Ok(Box::new(reader))
     }
 
     fn execute_schema(&mut self) -> Result<arrow_schema::Schema> {

Reply via email to