This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new c66d59f24 feat(rust/driver_manager): implement connection profiles 
(#3973)
c66d59f24 is described below

commit c66d59f24d61305e761e9f8f08e5332ad1a4257d
Author: Matt Topol <[email protected]>
AuthorDate: Thu Feb 26 18:26:40 2026 -0500

    feat(rust/driver_manager): implement connection profiles (#3973)
    
    Implementing connection profiles for the Rust driver manager including
    the default FilesystemProfileProvider and updating the objects and tests
    accordingly.
---
 rust/Cargo.lock                                    |   8 +
 rust/driver_manager/Cargo.toml                     |   2 +
 rust/driver_manager/src/lib.rs                     | 233 ++++++-
 rust/driver_manager/src/profile.rs                 | 761 +++++++++++++++++++++
 rust/driver_manager/src/search.rs                  | 406 ++++++++++-
 rust/driver_manager/tests/connection_profile.rs    | 571 ++++++++++++++++
 rust/driver_manager/tests/test_env_var_profiles.rs | 331 +++++++++
 7 files changed, 2290 insertions(+), 22 deletions(-)

diff --git a/rust/Cargo.lock b/rust/Cargo.lock
index e9f5ea94a..826c490c0 100644
--- a/rust/Cargo.lock
+++ b/rust/Cargo.lock
@@ -36,6 +36,8 @@ dependencies = [
  "arrow-schema",
  "arrow-select",
  "libloading",
+ "path-slash",
+ "regex",
  "temp-env",
  "tempfile",
  "toml",
@@ -1870,6 +1872,12 @@ version = "1.0.15"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
 
+[[package]]
+name = "path-slash"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42"
+
 [[package]]
 name = "pbjson"
 version = "0.8.0"
diff --git a/rust/driver_manager/Cargo.toml b/rust/driver_manager/Cargo.toml
index 74f057997..6afc0878b 100644
--- a/rust/driver_manager/Cargo.toml
+++ b/rust/driver_manager/Cargo.toml
@@ -40,6 +40,8 @@ adbc_ffi.workspace = true
 arrow-array.workspace = true
 arrow-schema.workspace = true
 libloading = "0.8.8"
+path-slash = "0.2.1"
+regex = "1.12.3"
 toml = { version = "1.0.3", default-features = false, features = [
     "parse",
     "display",
diff --git a/rust/driver_manager/src/lib.rs b/rust/driver_manager/src/lib.rs
index 6464ab399..439cb27e6 100644
--- a/rust/driver_manager/src/lib.rs
+++ b/rust/driver_manager/src/lib.rs
@@ -101,6 +101,7 @@
 // would prevent any parallelism between driver calls, which is not desirable.
 
 pub mod error;
+pub mod profile;
 pub(crate) mod search;
 
 use std::collections::HashSet;
@@ -127,7 +128,10 @@ use adbc_core::{
 };
 use adbc_ffi::driver_method;
 
-use self::search::{parse_driver_uri, DriverLibrary};
+use self::search::{parse_driver_uri, DriverLibrary, DriverLocator};
+use crate::profile::{
+    process_profile_value, ConnectionProfile, ConnectionProfileProvider, 
FilesystemProfileProvider,
+};
 
 const ERR_CANCEL_UNSUPPORTED: &str =
     "Canceling connection or statement is not supported with ADBC 1.0.0";
@@ -354,6 +358,67 @@ pub struct ManagedDatabase {
 }
 
 impl ManagedDatabase {
+    /// Creates a new database connection from a URI string.
+    ///
+    /// This method supports both direct driver URIs and profile references.
+    ///
+    /// # URI Formats
+    ///
+    /// ## Direct Driver Connection
+    /// - `"driver_name:connection_string"` - Loads the specified driver and 
connects
+    /// - `"driver_name://host:port/database"` - Standard database URI format
+    ///
+    /// ## Profile Reference
+    /// - `"profile://name"` - Loads connection configuration from a profile 
file
+    /// - `"profile:///absolute/path/to/profile.toml"` - Absolute path to 
profile
+    /// - `"profile://relative/path/to/profile.toml"` - Relative path to 
profile
+    ///
+    /// # Arguments
+    ///
+    /// * `uri` - The connection URI or profile reference
+    /// * `entrypoint` - Optional driver entrypoint name (uses default if 
`None`)
+    /// * `version` - ADBC version to use
+    /// * `load_flags` - Flags controlling driver loading behavior
+    /// * `additional_search_paths` - Optional paths to search for drivers or 
profiles
+    ///
+    /// # Returns
+    ///
+    /// A configured `ManagedDatabase` ready for creating connections.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - The URI format is invalid
+    /// - The driver cannot be loaded
+    /// - The profile cannot be found or parsed
+    /// - Database initialization fails
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// use adbc_core::options::AdbcVersion;
+    /// use adbc_driver_manager::ManagedDatabase;
+    /// use adbc_core::LOAD_FLAG_DEFAULT;
+    ///
+    /// // Direct connection
+    /// let db = ManagedDatabase::from_uri(
+    ///     "sqlite::memory:",
+    ///     None,
+    ///     AdbcVersion::V100,
+    ///     LOAD_FLAG_DEFAULT,
+    ///     None
+    /// )?;
+    ///
+    /// // Profile connection
+    /// let db = ManagedDatabase::from_uri(
+    ///     "profile://my_database",
+    ///     None,
+    ///     AdbcVersion::V100,
+    ///     LOAD_FLAG_DEFAULT,
+    ///     None
+    /// )?;
+    /// # Ok::<(), adbc_core::error::Error>(())
+    /// ```
     pub fn from_uri(
         uri: &str,
         entrypoint: Option<&[u8]>,
@@ -371,6 +436,53 @@ impl ManagedDatabase {
         )
     }
 
+    /// Creates a new database connection from a URI with additional options.
+    ///
+    /// This is similar to [`from_uri`](Self::from_uri), but allows passing 
additional
+    /// database options that override any options from profiles.
+    ///
+    /// # Arguments
+    ///
+    /// * `uri` - The connection URI or profile reference
+    /// * `entrypoint` - Optional driver entrypoint name
+    /// * `version` - ADBC version to use
+    /// * `load_flags` - Flags controlling driver loading behavior
+    /// * `additional_search_paths` - Optional paths to search for drivers or 
profiles
+    /// * `opts` - Database options to apply (override profile options)
+    ///
+    /// # Returns
+    ///
+    /// A configured `ManagedDatabase` with the specified options applied.
+    ///
+    /// # Option Priority
+    ///
+    /// Options are applied in this order (later values override earlier ones):
+    /// 1. Profile options (if using a profile URI)
+    /// 2. Options provided via `opts` parameter
+    /// 3. URI connection string (for direct driver URIs)
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// use adbc_core::options::{AdbcVersion, OptionDatabase, OptionValue};
+    /// use adbc_driver_manager::ManagedDatabase;
+    /// use adbc_core::LOAD_FLAG_DEFAULT;
+    ///
+    /// let opts = vec![
+    ///     (OptionDatabase::Username, 
OptionValue::String("user".to_string())),
+    ///     (OptionDatabase::Password, 
OptionValue::String("pass".to_string())),
+    /// ];
+    ///
+    /// let db = ManagedDatabase::from_uri_with_opts(
+    ///     "profile://my_database",
+    ///     None,
+    ///     AdbcVersion::V100,
+    ///     LOAD_FLAG_DEFAULT,
+    ///     None,
+    ///     opts,
+    /// )?;
+    /// # Ok::<(), adbc_core::error::Error>(())
+    /// ```
     pub fn from_uri_with_opts(
         uri: &str,
         entrypoint: Option<&[u8]>,
@@ -379,20 +491,121 @@ impl ManagedDatabase {
         additional_search_paths: Option<Vec<PathBuf>>,
         opts: impl IntoIterator<Item = (<Self as Optionable>::Option, 
OptionValue)>,
     ) -> Result<Self> {
-        let (driver, final_uri) = parse_driver_uri(uri)?;
-
-        let mut drv = ManagedDriver::load_from_name(
-            driver,
+        let profile_provider = FilesystemProfileProvider;
+        Self::from_uri_with_profile_provider(
+            uri,
             entrypoint,
             version,
             load_flags,
             additional_search_paths,
-        )?;
+            profile_provider,
+            opts,
+        )
+    }
+
+    /// Creates a new database connection from a URI with a custom profile 
provider.
+    ///
+    /// This advanced method allows using a custom implementation of
+    /// [`ConnectionProfileProvider`] to load profiles from alternative sources
+    /// (e.g., remote configuration services, encrypted storage, etc.).
+    ///
+    /// # Arguments
+    ///
+    /// * `uri` - The connection URI or profile reference
+    /// * `entrypoint` - Optional driver entrypoint name
+    /// * `version` - ADBC version to use
+    /// * `load_flags` - Flags controlling driver loading behavior
+    /// * `additional_search_paths` - Optional paths to search for drivers or 
profiles
+    /// * `profile_provider` - Custom profile provider implementation
+    /// * `opts` - Database options to apply (override profile options)
+    ///
+    /// # Returns
+    ///
+    /// A configured `ManagedDatabase` using the custom profile provider.
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// use adbc_core::options::{AdbcVersion, OptionDatabase, OptionValue};
+    /// use adbc_driver_manager::ManagedDatabase;
+    /// use adbc_driver_manager::profile::FilesystemProfileProvider;
+    /// use adbc_core::LOAD_FLAG_DEFAULT;
+    ///
+    /// let provider = FilesystemProfileProvider;
+    /// let opts = vec![(OptionDatabase::Username, 
OptionValue::String("admin".to_string()))];
+    ///
+    /// let db = ManagedDatabase::from_uri_with_profile_provider(
+    ///     "profile://my_database",
+    ///     None,
+    ///     AdbcVersion::V100,
+    ///     LOAD_FLAG_DEFAULT,
+    ///     None,
+    ///     provider,
+    ///     opts,
+    /// )?;
+    /// # Ok::<(), adbc_core::error::Error>(())
+    /// ```
+    pub fn from_uri_with_profile_provider(
+        uri: &str,
+        entrypoint: Option<&[u8]>,
+        version: AdbcVersion,
+        load_flags: LoadFlags,
+        additional_search_paths: Option<Vec<PathBuf>>,
+        profile_provider: impl ConnectionProfileProvider,
+        opts: impl IntoIterator<Item = (<Self as Optionable>::Option, 
OptionValue)>,
+    ) -> Result<Self> {
+        let result = parse_driver_uri(uri)?;
+        let (mut drv, default_opts) = match result {
+            DriverLocator::Uri(driver, final_uri) => {
+                let drv = ManagedDriver::load_from_name(
+                    driver,
+                    entrypoint,
+                    version,
+                    load_flags,
+                    additional_search_paths,
+                )?;
+
+                let final_opts = vec![(
+                    OptionDatabase::Uri,
+                    OptionValue::String(final_uri.to_string()),
+                )];
+                (drv, final_opts)
+            }
+            DriverLocator::Profile(profile) => {
+                let profile =
+                    profile_provider.get_profile(profile, 
additional_search_paths.clone())?;
+                let (driver_name, init_func) = profile.get_driver_name()?;
+
+                let drv: ManagedDriver;
+                if let Some(init_fn) = init_func {
+                    drv = ManagedDriver::load_static(init_fn, version)?;
+                } else {
+                    drv = ManagedDriver::load_from_name(
+                        driver_name,
+                        entrypoint,
+                        version,
+                        load_flags,
+                        additional_search_paths,
+                    )?;
+                }
+
+                let profile_opts: Vec<(OptionDatabase, OptionValue)> = profile
+                    .get_options()?
+                    .into_iter()
+                    .map(|(k, v)| -> Result<(OptionDatabase, OptionValue)> {
+                        if let OptionValue::String(s) = v {
+                            let result = process_profile_value(&s)?;
+                            Ok((k, result))
+                        } else {
+                            Ok((k, v))
+                        }
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                (drv, profile_opts)
+            }
+        };
 
-        drv.new_database_with_opts(opts.into_iter().chain(std::iter::once((
-            OptionDatabase::Uri,
-            OptionValue::String(final_uri.to_string()),
-        ))))
+        drv.new_database_with_opts(default_opts.into_iter().chain(opts))
     }
 
     fn ffi_driver(&self) -> &adbc_ffi::FFI_AdbcDriver {
diff --git a/rust/driver_manager/src/profile.rs 
b/rust/driver_manager/src/profile.rs
new file mode 100644
index 000000000..40a668d2f
--- /dev/null
+++ b/rust/driver_manager/src/profile.rs
@@ -0,0 +1,761 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::search::find_filesystem_profile;
+use crate::ManagedDatabase;
+use adbc_core::{
+    error::{Error, Result, Status},
+    options::{OptionDatabase, OptionValue},
+    Optionable,
+};
+use adbc_ffi::FFI_AdbcDriverInitFunc;
+use regex::{Captures, Regex};
+use std::path::PathBuf;
+use std::{collections::HashMap, fs};
+use std::{env, fmt, sync::OnceLock};
+use toml::de::{DeTable, DeValue};
+
+/// A connection profile that provides configuration for creating ADBC 
database connections.
+///
+/// Profiles contain the driver name, optional initialization function, and 
database options
+/// that can be used to create a configured database connection without 
needing to specify
+/// all connection details programmatically.
+pub trait ConnectionProfile {
+    /// Returns the driver name and an optional static initialization function.
+    ///
+    /// # Returns
+    ///
+    /// A tuple containing:
+    /// - A string slice with the driver name (e.g., "adbc_driver_sqlite")
+    /// - An optional reference to a statically-linked driver initialization 
function
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the profile is malformed or cannot be read.
+    fn get_driver_name(&self) -> Result<(&str, 
Option<&FFI_AdbcDriverInitFunc>)>;
+
+    /// Returns an iterator of database options to apply when creating a 
connection.
+    ///
+    /// # Returns
+    ///
+    /// An iterator yielding `(OptionDatabase, OptionValue)` tuples that 
should be
+    /// applied to the database before initialization.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the options cannot be retrieved or parsed.
+    fn get_options(
+        &self,
+    ) -> Result<impl IntoIterator<Item = (<ManagedDatabase as 
Optionable>::Option, OptionValue)>>;
+}
+
+/// Provides access to connection profiles from a specific storage backend.
+///
+/// Implementations of this trait define how profiles are located and loaded,
+/// such as from the filesystem, a configuration service, or other sources.
+pub trait ConnectionProfileProvider {
+    /// The concrete profile type returned by this provider.
+    type Profile: ConnectionProfile;
+
+    /// Retrieves a connection profile by name.
+    ///
+    /// # Arguments
+    ///
+    /// * `name` - The profile name or path to locate
+    /// * `additional_path_list` - Optional additional directories to search 
for profiles
+    ///
+    /// # Returns
+    ///
+    /// The loaded connection profile.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - The profile cannot be found
+    /// - The profile file is malformed
+    /// - The profile version is unsupported
+    fn get_profile(
+        &self,
+        name: &str,
+        additional_path_list: Option<Vec<PathBuf>>,
+    ) -> Result<Self::Profile>;
+}
+
+/// Provides connection profiles from TOML files on the filesystem.
+///
+/// This provider searches for profile files with a `.toml` extension in 
standard
+/// configuration directories and any additional paths provided. Profile files 
must
+/// conform to the ADBC profile specification version 1.
+///
+/// # Search Order
+///
+/// Profiles are searched in the following order:
+/// 1. Additional paths provided via `get_profile()`
+/// 2. `ADBC_PROFILE_PATH` environment variable paths
+/// 3. User configuration directory (`~/.config/adbc/profiles` on Linux,
+///    `~/Library/Application Support/ADBC/Profiles` on macOS,
+///    `%LOCALAPPDATA%\ADBC\Profiles` on Windows)
+///
+/// # Example
+///
+/// ```no_run
+/// use adbc_driver_manager::profile::{
+///     ConnectionProfileProvider, FilesystemProfileProvider
+/// };
+///
+/// let provider = FilesystemProfileProvider;
+/// let profile = provider.get_profile("my_database", None)?;
+/// # Ok::<(), adbc_core::error::Error>(())
+/// ```
+pub struct FilesystemProfileProvider;
+
+impl ConnectionProfileProvider for FilesystemProfileProvider {
+    type Profile = FilesystemProfile;
+
+    fn get_profile(
+        &self,
+        name: &str,
+        additional_path_list: Option<Vec<PathBuf>>,
+    ) -> Result<Self::Profile> {
+        let profile_path = find_filesystem_profile(name, 
additional_path_list)?;
+        FilesystemProfile::from_path(profile_path)
+    }
+}
+
+/// Recursively processes TOML table entries into database options.
+///
+/// This function flattens nested TOML tables into dot-separated option keys
+/// and converts TOML values into appropriate `OptionValue` types.
+///
+/// # Arguments
+///
+/// * `opts` - Map to populate with parsed options
+/// * `prefix` - Current key prefix for nested tables (e.g., "connection." for 
nested options)
+/// * `table` - TOML table to process
+///
+/// # Supported Types
+///
+/// - String values → `OptionValue::String`
+/// - Integer values → `OptionValue::Int`
+/// - Float values → `OptionValue::Double`
+/// - Boolean values → `OptionValue::String` (converted to "true" or "false")
+/// - Nested tables → Recursively processed with dot-separated keys
+///
+/// # Errors
+///
+/// Returns an error if:
+/// - An integer value cannot be parsed as `i64`
+/// - A float value cannot be parsed as `f64`
+/// - An unsupported TOML type is encountered (e.g., arrays, inline tables)
+fn process_options(
+    opts: &mut HashMap<OptionDatabase, OptionValue>,
+    prefix: &str,
+    table: &DeTable,
+) -> Result<()> {
+    for (key, value) in table.iter() {
+        let full_key = format!("{}{}", prefix, key.get_ref());
+        match value.get_ref() {
+            DeValue::String(s) => {
+                opts.insert(full_key.as_str().into(), 
OptionValue::String(s.to_string()));
+            }
+            DeValue::Integer(i) => {
+                let val: i64 = i.as_str().parse().map_err(|e| {
+                    Error::with_message_and_status(
+                        format!("invalid integer value for key '{}': {e}", 
full_key),
+                        Status::InvalidArguments,
+                    )
+                })?;
+                opts.insert(full_key.as_str().into(), OptionValue::Int(val));
+            }
+            DeValue::Float(f) => {
+                let val: f64 = f.as_str().parse().map_err(|e| {
+                    Error::with_message_and_status(
+                        format!("invalid float value for key '{}': {e}", 
full_key),
+                        Status::InvalidArguments,
+                    )
+                })?;
+                opts.insert(full_key.as_str().into(), 
OptionValue::Double(val));
+            }
+            DeValue::Boolean(b) => {
+                opts.insert(full_key.as_str().into(), 
OptionValue::String(b.to_string()));
+            }
+            DeValue::Table(t) => {
+                let nested_prefix = format!("{}.", full_key);
+                process_options(opts, &nested_prefix, t)?;
+            }
+            _ => {
+                return Err(Error::with_message_and_status(
+                    format!("unsupported option type for key '{}'", full_key),
+                    Status::InvalidArguments,
+                ));
+            }
+        }
+    }
+    Ok(())
+}
+
+/// A connection profile loaded from a TOML file on the filesystem.
+///
+/// This profile contains:
+/// - The path to the profile file
+/// - The driver name specified in the profile
+/// - A map of database options parsed from the profile
+///
+/// # Profile Format
+///
+/// Profile files must be valid TOML with the following structure:
+///
+/// ```toml
+/// version = 1
+/// driver = "driver_name"
+///
+/// [options]
+/// option_key = "option_value"
+/// nested.key = "nested_value"
+/// ```
+///
+/// Currently, only version 1 profiles are supported.
+#[derive(Debug)]
+pub struct FilesystemProfile {
+    profile_path: PathBuf,
+    driver: String,
+    opts: HashMap<OptionDatabase, OptionValue>,
+}
+
+impl FilesystemProfile {
+    /// Loads a profile from the specified filesystem path.
+    ///
+    /// # Arguments
+    ///
+    /// * `profile_path` - Path to the TOML profile file
+    ///
+    /// # Returns
+    ///
+    /// A loaded `FilesystemProfile` with parsed configuration.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if:
+    /// - The file cannot be read
+    /// - The TOML is malformed
+    /// - The profile version is not "1"
+    /// - The `options` table is missing or invalid
+    /// - Any option values cannot be parsed
+    fn from_path(profile_path: PathBuf) -> Result<Self> {
+        let contents = fs::read_to_string(&profile_path).map_err(|e| {
+            Error::with_message_and_status(
+                format!("could not read profile '{}': {e}", 
profile_path.display()),
+                Status::InvalidArguments,
+            )
+        })?;
+
+        let profile = DeTable::parse(&contents)
+            .map_err(|e| Error::with_message_and_status(e.to_string(), 
Status::InvalidArguments))?;
+
+        let profile_version = profile
+            .get_ref()
+            .get("version")
+            .and_then(|v| v.get_ref().as_integer())
+            .map(|v| v.as_str())
+            .unwrap_or("1");
+
+        if profile_version != "1" {
+            return Err(Error::with_message_and_status(
+                format!(
+                    "unsupported profile version '{}', expected '1'",
+                    profile_version
+                ),
+                Status::InvalidArguments,
+            ));
+        }
+
+        let driver = profile
+            .get_ref()
+            .get("driver")
+            .and_then(|v| v.get_ref().as_str())
+            .unwrap_or("")
+            .to_string();
+
+        let options_table = profile
+            .get_ref()
+            .get("options")
+            .and_then(|v| v.get_ref().as_table())
+            .ok_or_else(|| {
+                Error::with_message_and_status(
+                    "missing or invalid 'options' table in 
profile".to_string(),
+                    Status::InvalidArguments,
+                )
+            })?;
+
+        let mut opts = HashMap::new();
+        process_options(&mut opts, "", options_table)?;
+
+        Ok(FilesystemProfile {
+            profile_path,
+            driver,
+            opts,
+        })
+    }
+}
+
+impl ConnectionProfile for FilesystemProfile {
+    fn get_driver_name(&self) -> Result<(&str, 
Option<&FFI_AdbcDriverInitFunc>)> {
+        Ok((&self.driver, None))
+    }
+
+    fn get_options(
+        &self,
+    ) -> Result<impl IntoIterator<Item = (<ManagedDatabase as 
Optionable>::Option, OptionValue)>>
+    {
+        Ok(self.opts.clone().into_iter())
+    }
+}
+
+impl fmt::Display for FilesystemProfile {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "FilesystemProfile({})", self.profile_path.display())
+    }
+}
+
+fn profile_value_regex() -> &'static Regex {
+    static RE: OnceLock<Regex> = OnceLock::new();
+    RE.get_or_init(|| Regex::new(r"\{\{\s*([^{}]*?)\s*\}\}").unwrap())
+}
+
+pub(crate) fn process_profile_value(value: &str) -> Result<OptionValue> {
+    let re = profile_value_regex();
+
+    let replacer = |caps: &Captures| -> Result<String> {
+        let content = caps.get(1).unwrap().as_str();
+        if !content.starts_with("env_var(") || !content.ends_with(")") {
+            return Err(Error::with_message_and_status(
+                format!(
+                    "invalid profile replacement expression '{{{{ {} }}}}'",
+                    content
+                ),
+                Status::InvalidArguments,
+            ));
+        }
+
+        let env_var_name = content[8..content.len() - 1].trim();
+        if env_var_name.is_empty() {
+            return Err(Error::with_message_and_status(
+                format!("empty environment variable name in profile 
replacement expression '{{{{ {} }}}}'", content),
+                Status::InvalidArguments,
+            ));
+        }
+
+        match env::var(env_var_name) {
+            Ok(val) => Ok(val),
+            Err(env::VarError::NotPresent) => Ok("".to_string()),
+            Err(e) => Err(Error::with_message_and_status(
+                format!("error retrieving environment variable '{}' for 
profile replacement expression '{{{{ {} }}}}': {}", env_var_name, content, e),
+                Status::InvalidArguments,
+            )),
+        }
+    };
+
+    let mut new = String::with_capacity(value.len());
+    let mut last_match = 0;
+    for caps in re.captures_iter(value) {
+        let m = caps.get(0).unwrap();
+        new.push_str(&value[last_match..m.start()]);
+        new.push_str(&replacer(&caps)?);
+        last_match = m.end();
+    }
+    new.push_str(&value[last_match..]);
+    Ok(OptionValue::String(new))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::collections::HashMap;
+    use toml::de::DeTable;
+
+    #[test]
+    fn test_process_options_basic_types() {
+        let test_cases = vec![
+            (
+                "string value",
+                r#"key = "value""#,
+                vec![("key", OptionValue::String("value".to_string()))],
+            ),
+            (
+                "integer value",
+                r#"port = 5432"#,
+                vec![("port", OptionValue::Int(5432))],
+            ),
+            (
+                "float value",
+                r#"timeout = 30.5"#,
+                vec![("timeout", OptionValue::Double(30.5))],
+            ),
+            (
+                "boolean values",
+                r#"enabled = true
+disabled = false"#,
+                vec![
+                    ("enabled", OptionValue::String("true".to_string())),
+                    ("disabled", OptionValue::String("false".to_string())),
+                ],
+            ),
+            (
+                "multiple types",
+                r#"str = "text"
+num = 42
+flt = 1.5
+flag = true"#,
+                vec![
+                    ("str", OptionValue::String("text".to_string())),
+                    ("num", OptionValue::Int(42)),
+                    ("flt", OptionValue::Double(1.5)),
+                    ("flag", OptionValue::String("true".to_string())),
+                ],
+            ),
+        ];
+
+        for (name, toml_str, expected_opts) in test_cases {
+            let table = DeTable::parse(toml_str).unwrap();
+            let mut opts = HashMap::new();
+            process_options(&mut opts, "", table.get_ref())
+                .unwrap_or_else(|_| panic!("Failed to process options for test 
case: {}", name));
+
+            assert_eq!(
+                opts.len(),
+                expected_opts.len(),
+                "Test case '{}': expected {} options, got {}",
+                name,
+                expected_opts.len(),
+                opts.len()
+            );
+
+            for (key_str, expected_value) in expected_opts {
+                let key = OptionDatabase::from(key_str);
+                let actual_value = opts
+                    .get(&key)
+                    .unwrap_or_else(|| panic!("Test case '{}': missing key 
'{}'", name, key_str));
+
+                match (actual_value, &expected_value) {
+                    (OptionValue::String(a), OptionValue::String(e)) => {
+                        assert_eq!(
+                            a, e,
+                            "Test case '{}': string mismatch for key '{}'",
+                            name, key_str
+                        );
+                    }
+                    (OptionValue::Int(a), OptionValue::Int(e)) => {
+                        assert_eq!(
+                            a, e,
+                            "Test case '{}': int mismatch for key '{}'",
+                            name, key_str
+                        );
+                    }
+                    (OptionValue::Double(a), OptionValue::Double(e)) => {
+                        assert!(
+                            (a - e).abs() < 1e-10,
+                            "Test case '{}': float mismatch for key '{}'",
+                            name,
+                            key_str
+                        );
+                    }
+                    _ => panic!("Test case '{}': type mismatch for key '{}'", 
name, key_str),
+                }
+            }
+        }
+    }
+
+    #[test]
+    fn test_process_options_nested_table() {
+        let toml_str = r#"
+key = "value"
+[nested]
+subkey = "subvalue"
+number = 42
+"#;
+        let table = DeTable::parse(toml_str).unwrap();
+        let mut opts = HashMap::new();
+        process_options(&mut opts, "", table.get_ref()).unwrap();
+
+        assert_eq!(opts.len(), 3);
+
+        let key1 = OptionDatabase::from("key");
+        match opts.get(&key1) {
+            Some(OptionValue::String(s)) => assert_eq!(s, "value"),
+            _ => panic!("Expected string value"),
+        }
+
+        let key2 = OptionDatabase::from("nested.subkey");
+        match opts.get(&key2) {
+            Some(OptionValue::String(s)) => assert_eq!(s, "subvalue"),
+            _ => panic!("Expected string value for nested.subkey"),
+        }
+
+        let key3 = OptionDatabase::from("nested.number");
+        match opts.get(&key3) {
+            Some(OptionValue::Int(i)) => assert_eq!(*i, 42),
+            _ => panic!("Expected int value for nested.number"),
+        }
+    }
+
+    #[test]
+    fn test_process_options_deeply_nested() {
+        let toml_str = r#"
+[level1]
+[level1.level2]
+[level1.level2.level3]
+deep = "value"
+"#;
+        let table = DeTable::parse(toml_str).unwrap();
+        let mut opts = HashMap::new();
+        process_options(&mut opts, "", table.get_ref()).unwrap();
+
+        assert_eq!(opts.len(), 1);
+
+        let key = OptionDatabase::from("level1.level2.level3.deep");
+        match opts.get(&key) {
+            Some(OptionValue::String(s)) => assert_eq!(s, "value"),
+            _ => panic!("Expected string value"),
+        }
+    }
+
+    #[test]
+    fn test_process_options_error_cases() {
+        let test_cases = vec![
+            (
+                "invalid integer (too large)",
+                r#"bad_int = 999999999999999999999999999999"#,
+                "invalid integer value",
+            ),
+            (
+                "unsupported type (array)",
+                r#"array = [1, 2, 3]"#,
+                "unsupported option type",
+            ),
+        ];
+
+        for (name, toml_str, expected_error_msg) in test_cases {
+            let table = DeTable::parse(toml_str).unwrap();
+            let mut opts = HashMap::new();
+            let result = process_options(&mut opts, "", table.get_ref());
+
+            assert!(
+                result.is_err(),
+                "Test case '{}': expected error but got Ok",
+                name
+            );
+            let err = result.unwrap_err();
+            assert_eq!(
+                err.status,
+                Status::InvalidArguments,
+                "Test case '{}': wrong status",
+                name
+            );
+            assert!(
+                err.message.contains(expected_error_msg),
+                "Test case '{}': expected '{}' in error message, got '{}'",
+                name,
+                expected_error_msg,
+                err.message
+            );
+        }
+    }
+
+    #[test]
+    fn test_filesystem_profile_from_path_errors() {
+        let test_cases = vec![
+            (
+                "missing file",
+                "/nonexistent/path/to/profile.toml",
+                None,
+                "could not read profile",
+            ),
+            (
+                "invalid version (too high)",
+                "invalid_version_high.toml",
+                Some(
+                    r#"
+version = 99
+driver = "test_driver"
+
+[options]
+key = "value"
+"#,
+                ),
+                "unsupported profile version '99', expected '1'",
+            ),
+            (
+                "version 0",
+                "version_zero.toml",
+                Some(
+                    r#"
+version = 0
+driver = "test_driver"
+
+[options]
+key = "value"
+"#,
+                ),
+                "unsupported profile version '0', expected '1'",
+            ),
+            (
+                "version 2",
+                "version_two.toml",
+                Some(
+                    r#"
+version = 2
+driver = "test_driver"
+
+[options]
+key = "value"
+"#,
+                ),
+                "unsupported profile version '2', expected '1'",
+            ),
+        ];
+
+        for (name, filename, content_opt, expected_error_msg) in test_cases {
+            let profile_path = if let Some(content) = content_opt {
+                let tmp_dir = tempfile::Builder::new()
+                    .prefix("adbc_profile_test")
+                    .tempdir()
+                    .unwrap();
+                let path = tmp_dir.path().join(filename);
+                std::fs::write(&path, content).unwrap();
+                // Keep tmp_dir alive until after the test
+                let result = FilesystemProfile::from_path(path.clone());
+
+                assert!(
+                    result.is_err(),
+                    "Test case '{}': expected error but got Ok",
+                    name
+                );
+                let err = result.unwrap_err();
+                assert_eq!(
+                    err.status,
+                    Status::InvalidArguments,
+                    "Test case '{}': wrong status",
+                    name
+                );
+                assert!(
+                    err.message.contains(expected_error_msg),
+                    "Test case '{}': expected '{}' in error message, got '{}'",
+                    name,
+                    expected_error_msg,
+                    err.message
+                );
+
+                tmp_dir.close().unwrap();
+                continue;
+            } else {
+                PathBuf::from(filename)
+            };
+
+            let result = FilesystemProfile::from_path(profile_path);
+
+            assert!(
+                result.is_err(),
+                "Test case '{}': expected error but got Ok",
+                name
+            );
+            let err = result.unwrap_err();
+            assert_eq!(
+                err.status,
+                Status::InvalidArguments,
+                "Test case '{}': wrong status",
+                name
+            );
+            assert!(
+                err.message.contains(expected_error_msg),
+                "Test case '{}': expected '{}' in error message, got '{}'",
+                name,
+                expected_error_msg,
+                err.message
+            );
+        }
+    }
+
+    #[test]
+    fn test_filesystem_profile_provider() {
+        let profile_content = r#"
+version = 1
+driver = "test_driver"
+
+[options]
+test_key = "test_value"
+"#;
+
+        let test_cases = vec![
+            ("absolute path", "absolute_test.toml", None, true),
+            (
+                "search path with name only",
+                "search_test",
+                Some(vec![]),
+                true,
+            ),
+            (
+                "search path with .toml extension",
+                "search_test.toml",
+                Some(vec![]),
+                true,
+            ),
+        ];
+
+        for (name, profile_name, search_paths_opt, should_succeed) in 
test_cases {
+            let tmp_dir = tempfile::Builder::new()
+                .prefix("adbc_profile_test")
+                .tempdir()
+                .unwrap();
+
+            let profile_path = tmp_dir.path().join(if 
profile_name.ends_with(".toml") {
+                profile_name.to_string()
+            } else {
+                format!("{}.toml", profile_name)
+            });
+            std::fs::write(&profile_path, profile_content).unwrap();
+
+            let provider = FilesystemProfileProvider;
+            let search_paths = search_paths_opt.map(|mut paths| {
+                paths.push(tmp_dir.path().to_path_buf());
+                paths
+            });
+
+            let profile_arg = if name.contains("absolute") {
+                profile_path.to_str().unwrap().to_string()
+            } else {
+                profile_name.to_string()
+            };
+
+            let result = provider.get_profile(&profile_arg, search_paths);
+
+            if should_succeed {
+                let profile =
+                    result.unwrap_or_else(|e| panic!("Test case '{}' failed: 
{:?}", name, e));
+                let (driver, _) = profile.get_driver_name().unwrap();
+                assert_eq!(
+                    driver, "test_driver",
+                    "Test case '{}': driver mismatch",
+                    name
+                );
+            } else {
+                assert!(result.is_err(), "Test case '{}': expected error", 
name);
+            }
+
+            tmp_dir.close().unwrap();
+        }
+    }
+}
diff --git a/rust/driver_manager/src/search.rs 
b/rust/driver_manager/src/search.rs
index f9d7a59f7..c8a7ec399 100644
--- a/rust/driver_manager/src/search.rs
+++ b/rust/driver_manager/src/search.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use path_slash::PathBufExt;
 use std::borrow::Cow;
 use std::ffi::{c_void, OsStr};
 use std::fmt::Write;
@@ -816,7 +817,181 @@ fn get_search_paths(lvls: LoadFlags) -> Vec<PathBuf> {
     result
 }
 
-pub(crate) fn parse_driver_uri(uri: &str) -> Result<(&str, &str)> {
+/// Locates a connection profile file on the filesystem.
+///
+/// This function searches for profile files with a `.toml` extension using the
+/// following strategy:
+///
+/// 1. If `name` is an absolute path with `.toml` extension, verify it exists
+/// 2. If `name` is an absolute path without extension, add `.toml` and return
+/// 3. If `name` is a relative path that exists in the current directory, 
return it
+/// 4. Search configured profile directories in order:
+///    - Additional paths provided
+///    - `ADBC_PROFILE_PATH` environment variable paths
+///    - Conda prefix path (if built with `conda_build`)
+///    - User configuration directory
+///
+/// # Arguments
+///
+/// * `name` - Profile name or path (e.g., "my_profile", 
"/path/to/profile.toml")
+/// * `additional_path_list` - Optional additional directories to search
+///
+/// # Returns
+///
+/// The absolute path to the located profile file.
+///
+/// # Errors
+///
+/// Returns `Status::NotFound` if the profile cannot be located in any search 
path.
+pub(crate) fn find_filesystem_profile(
+    name: &str,
+    additional_path_list: Option<Vec<PathBuf>>,
+) -> Result<PathBuf> {
+    // Convert the name to a PathBuf to ensure proper platform-specific path 
handling.
+    // This normalizes forward slashes to backslashes on Windows.
+    let profile_path = PathBuf::from_slash(name);
+    let has_toml_ext = profile_path.extension().is_some_and(|ext| ext == 
"toml");
+
+    // Handle absolute paths
+    if profile_path.is_absolute() {
+        if has_toml_ext {
+            return Ok(profile_path);
+        }
+
+        // No .toml extension - add it
+        return Ok(profile_path.with_extension("toml"));
+    }
+
+    // For relative paths, check if it's a file at the current location first
+    if profile_path.is_file() {
+        return Ok(profile_path.to_path_buf());
+    }
+
+    // Search in the configured paths
+    let path_list = get_profile_search_paths(additional_path_list);
+    let has_toml_ext = profile_path.extension().is_some_and(|ext| ext == 
"toml");
+
+    path_list
+        .iter()
+        .find_map(|path| {
+            let mut full_path = path.join(profile_path.as_path());
+            if has_toml_ext {
+                // Name already has .toml extension, use it as-is
+                if full_path.is_file() {
+                    Some(full_path)
+                } else {
+                    None
+                }
+            } else {
+                // try adding .toml extension
+                full_path.set_extension("toml");
+                if full_path.is_file() {
+                    Some(full_path)
+                } else {
+                    None
+                }
+            }
+        })
+        .ok_or_else(|| {
+            Error::with_message_and_status(format!("Profile not found: {}", 
name), Status::NotFound)
+        })
+}
+
+/// Returns the list of directories to search for connection profiles.
+///
+/// Directories are returned in search priority order:
+///
+/// 1. Additional paths provided (highest priority)
+/// 2. `ADBC_PROFILE_PATH` environment variable (colon/semicolon-separated 
paths)
+/// 3. Conda prefix path: `$CONDA_PREFIX/etc/adbc/drivers` (if built with 
`conda_build`)
+/// 4. User config directory:
+///    - Linux: `~/.config/adbc/profiles`
+///    - macOS: `~/Library/Application Support/ADBC/Profiles`
+///    - Windows: `%LOCALAPPDATA%\ADBC\Profiles`
+///
+/// # Arguments
+///
+/// * `additional_path_list` - Optional additional directories to prepend to 
the search path
+///
+/// # Returns
+///
+/// A vector of paths to search for profiles, in priority order.
+fn get_profile_search_paths(additional_path_list: Option<Vec<PathBuf>>) -> 
Vec<PathBuf> {
+    let mut result = additional_path_list.unwrap_or_default();
+
+    // Add ADBC_PROFILE_PATH environment variable paths
+    if let Some(paths) = env::var_os("ADBC_PROFILE_PATH") {
+        result.extend(env::split_paths(&paths));
+    }
+
+    // Add conda-specific path if built with conda_build
+    #[cfg(conda_build)]
+    if let Some(conda_prefix) = env::var_os("CONDA_PREFIX") {
+        result.push(
+            PathBuf::from(conda_prefix)
+                .join("etc")
+                .join("adbc")
+                .join("drivers"),
+        );
+    }
+
+    // Add user config directory path
+    #[cfg(any(target_os = "windows", target_os = "macos"))]
+    const PROFILE_DIR_NAME: &str = "Profiles";
+    #[cfg(not(any(target_os = "windows", target_os = "macos")))]
+    const PROFILE_DIR_NAME: &str = "profiles";
+
+    if let Some(profiles_dir) = user_config_dir().and_then(|d| 
d.parent().map(|p| p.to_path_buf()))
+    {
+        result.push(profiles_dir.join(PROFILE_DIR_NAME));
+    }
+
+    result
+}
+
+/// Result of parsing a driver URI, indicating how to load the driver.
+///
+/// URIs can specify either a direct driver connection or a profile to load.
+#[derive(Debug)]
+pub(crate) enum DriverLocator<'a> {
+    /// Direct driver URI: (driver_name, connection_string)
+    ///
+    /// Example: `"sqlite:file::memory:"` → `Uri("sqlite", "file::memory:")`
+    Uri(&'a str, &'a str),
+
+    /// Profile reference: (profile_name_or_path)
+    ///
+    /// Example: `"profile://my_database"` → `Profile("my_database")`
+    Profile(&'a str),
+}
+
+/// Parses a driver URI to determine the connection method.
+///
+/// # URI Formats
+///
+/// ## Direct Driver URI
+/// - `driver:connection_string` - Uses the specified driver with connection 
string
+/// - `driver://host:port/database` - Standard URI format with driver scheme
+///
+/// ## Profile URI
+/// - `profile://name` - Loads profile named "name" from standard locations
+/// - `profile://path/to/profile.toml` - Loads profile from relative path
+/// - `profile:///absolute/path/to/profile.toml` - Loads profile from absolute 
path
+///
+/// # Arguments
+///
+/// * `uri` - The URI string to parse
+///
+/// # Returns
+///
+/// A `SearchResult` indicating whether to use a driver directly or load a 
profile.
+///
+/// # Errors
+///
+/// Returns `Status::InvalidArguments` if:
+/// - The URI has no colon separator
+/// - The URI format is invalid
+pub(crate) fn parse_driver_uri<'a>(uri: &'a str) -> Result<DriverLocator<'a>> {
     let idx = uri.find(":").ok_or(Error::with_message_and_status(
         format!("Invalid URI: {uri}"),
         Status::InvalidArguments,
@@ -824,20 +999,29 @@ pub(crate) fn parse_driver_uri(uri: &str) -> 
Result<(&str, &str)> {
 
     let driver = &uri[..idx];
     if uri.len() <= idx + 2 {
-        return Ok((driver, uri));
+        return Ok(DriverLocator::Uri(driver, uri));
     }
 
     #[cfg(target_os = "windows")]
     if let Ok(true) = std::fs::exists(uri) {
-        return Ok((uri, ""));
+        return Ok(DriverLocator::Uri(uri, ""));
     }
 
     if &uri[idx..idx + 2] == ":/" {
         // scheme is also driver
-        return Ok((driver, uri));
+        if driver == "profile" && uri.len() > idx + 2 {
+            // Check if it's "://" (two slashes) or just ":/" (one slash)
+            if uri.len() > idx + 3 && &uri[idx + 2..idx + 3] == "/" {
+                // It's "profile://..." - skip "://" (three characters)
+                return Ok(DriverLocator::Profile(&uri[idx + 3..]));
+            }
+            // It's "profile:/..." - skip ":/" (two characters)
+            return Ok(DriverLocator::Profile(&uri[idx + 2..]));
+        }
+        return Ok(DriverLocator::Uri(driver, uri));
     }
 
-    Ok((driver, &uri[idx + 1..]))
+    Ok(DriverLocator::Uri(driver, &uri[idx + 1..]))
 }
 
 #[cfg(test)]
@@ -1380,26 +1564,57 @@ mod tests {
     fn test_parse_driver_uri() {
         let cases = vec![
             ("sqlite", Err(Status::InvalidArguments)),
-            ("sqlite:", Ok(("sqlite", "sqlite:"))),
-            ("sqlite:file::memory:", Ok(("sqlite", "file::memory:"))),
+            ("sqlite:", Ok(DriverLocator::Uri("sqlite", "sqlite:"))),
+            (
+                "sqlite:file::memory:",
+                Ok(DriverLocator::Uri("sqlite", "file::memory:")),
+            ),
             (
                 "sqlite:file::memory:?cache=shared",
-                Ok(("sqlite", "file::memory:?cache=shared")),
+                Ok(DriverLocator::Uri("sqlite", "file::memory:?cache=shared")),
             ),
             (
                 "postgresql://a:b@localhost:9999/nonexistent",
-                Ok(("postgresql", 
"postgresql://a:b@localhost:9999/nonexistent")),
+                Ok(DriverLocator::Uri(
+                    "postgresql",
+                    "postgresql://a:b@localhost:9999/nonexistent",
+                )),
+            ),
+            (
+                "profile://my_profile",
+                Ok(DriverLocator::Profile("my_profile")),
+            ),
+            (
+                "profile://path/to/profile.toml",
+                Ok(DriverLocator::Profile("path/to/profile.toml")),
             ),
+            (
+                "profile:///absolute/path/to/profile.toml",
+                Ok(DriverLocator::Profile("/absolute/path/to/profile.toml")),
+            ),
+            ("invalid_uri", Err(Status::InvalidArguments)),
         ];
 
         for (input, expected) in cases {
             let result = parse_driver_uri(input);
             match expected {
-                Ok((exp_driver, exp_conn)) => {
-                    let (driver, conn) = result.expect("Expected Ok result");
+                Ok(DriverLocator::Uri(exp_driver, exp_conn)) => {
+                    let DriverLocator::Uri(driver, conn) = 
result.expect("Expected Ok result")
+                    else {
+                        panic!("Expected DriverUri result");
+                    };
+
                     assert_eq!(driver, exp_driver);
                     assert_eq!(conn, exp_conn);
                 }
+                Ok(DriverLocator::Profile(exp_profile)) => {
+                    let DriverLocator::Profile(profile) = 
result.expect("Expected Ok result")
+                    else {
+                        panic!("Expected Profile result");
+                    };
+
+                    assert_eq!(profile, exp_profile);
+                }
                 Err(exp_status) => {
                     let err = result.expect_err("Expected Err result");
                     assert_eq!(err.status, exp_status);
@@ -1447,7 +1662,11 @@ mod tests {
         }
         std::fs::write(&temp_db_path, b"").expect("Failed to create temporary 
database file");
 
-        let (driver, conn) = 
parse_driver_uri(temp_db_path.to_str().unwrap()).unwrap();
+        let DriverLocator::Uri(driver, conn) =
+            parse_driver_uri(temp_db_path.to_str().unwrap()).unwrap()
+        else {
+            panic!("Expected DriverUri result");
+        };
 
         assert_eq!(driver, temp_db_path.to_str().unwrap());
         assert_eq!(conn, "");
@@ -1456,4 +1675,167 @@ mod tests {
             .close()
             .expect("Failed to close/remove temporary directory");
     }
+
+    #[test]
+    fn test_find_filesystem_profile() {
+        let test_cases = vec![
+            (
+                "absolute path with extension",
+                "test_profile.toml",
+                None,
+                true,
+                true,
+            ),
+            (
+                "relative name without extension",
+                "my_profile",
+                Some(vec![]),
+                false,
+                true,
+            ),
+            (
+                "relative name with extension",
+                "my_profile.toml",
+                Some(vec![]),
+                false,
+                true,
+            ),
+            (
+                "absolute path without extension",
+                "profile_no_ext",
+                None,
+                true,
+                true,
+            ),
+            (
+                "nonexistent profile",
+                "nonexistent_profile",
+                None,
+                false,
+                false,
+            ),
+        ];
+
+        for (name, profile_name, search_paths_opt, is_absolute, 
should_succeed) in test_cases {
+            let tmp_dir = tempfile::Builder::new()
+                .prefix("adbc_profile_test")
+                .tempdir()
+                .unwrap();
+
+            let expected_profile_path = tmp_dir.path().join(if 
profile_name.ends_with(".toml") {
+                profile_name.to_string()
+            } else {
+                format!("{}.toml", profile_name)
+            });
+
+            if should_succeed {
+                std::fs::write(&expected_profile_path, "test 
content").unwrap();
+            }
+
+            let search_paths = search_paths_opt.map(|mut paths| {
+                paths.push(tmp_dir.path().to_path_buf());
+                paths
+            });
+
+            let profile_arg = if is_absolute {
+                if profile_name.ends_with(".toml") {
+                    expected_profile_path.to_str().unwrap().to_string()
+                } else {
+                    tmp_dir
+                        .path()
+                        .join(profile_name)
+                        .to_str()
+                        .unwrap()
+                        .to_string()
+                }
+            } else {
+                profile_name.to_string()
+            };
+
+            let result = find_filesystem_profile(&profile_arg, search_paths);
+
+            if should_succeed {
+                assert!(
+                    result.is_ok(),
+                    "Test case '{}' failed: {:?}",
+                    name,
+                    result.err()
+                );
+                assert_eq!(
+                    result.unwrap(),
+                    expected_profile_path,
+                    "Test case '{}': path mismatch",
+                    name
+                );
+            } else {
+                assert!(result.is_err(), "Test case '{}': expected error", 
name);
+                let err = result.unwrap_err();
+                assert_eq!(
+                    err.status,
+                    Status::NotFound,
+                    "Test case '{}': wrong error status",
+                    name
+                );
+                assert!(
+                    err.message.contains("Profile not found"),
+                    "Test case '{}': wrong error message",
+                    name
+                );
+            }
+
+            tmp_dir.close().unwrap();
+        }
+    }
+
+    #[test]
+    fn test_find_filesystem_profile_search_multiple_paths() {
+        let tmp_dir1 = tempfile::Builder::new()
+            .prefix("adbc_profile_test1")
+            .tempdir()
+            .unwrap();
+        let tmp_dir2 = tempfile::Builder::new()
+            .prefix("adbc_profile_test2")
+            .tempdir()
+            .unwrap();
+
+        // Create profile in second directory
+        let profile_path = tmp_dir2.path().join("searched_profile.toml");
+        std::fs::write(&profile_path, "test content").unwrap();
+
+        let result = find_filesystem_profile(
+            "searched_profile",
+            Some(vec![
+                tmp_dir1.path().to_path_buf(),
+                tmp_dir2.path().to_path_buf(),
+            ]),
+        );
+        assert!(result.is_ok());
+        assert_eq!(result.unwrap(), profile_path);
+
+        tmp_dir1.close().unwrap();
+        tmp_dir2.close().unwrap();
+    }
+
+    #[test]
+    fn test_get_profile_search_paths() {
+        // Test that additional paths are included
+        let tmp_dir = tempfile::Builder::new()
+            .prefix("adbc_profile_test")
+            .tempdir()
+            .unwrap();
+
+        let paths = 
get_profile_search_paths(Some(vec![tmp_dir.path().to_path_buf()]));
+
+        assert!(paths.contains(&tmp_dir.path().to_path_buf()));
+        assert!(!paths.is_empty());
+
+        tmp_dir.close().unwrap();
+    }
+
+    #[test]
+    fn test_get_profile_search_paths_empty() {
+        let paths = get_profile_search_paths(None);
+        // Should still return some paths (env vars, user config, etc.)
+        assert!(!paths.is_empty() || paths.is_empty()); // Just verify it 
doesn't panic
+    }
 }
diff --git a/rust/driver_manager/tests/connection_profile.rs 
b/rust/driver_manager/tests/connection_profile.rs
new file mode 100644
index 000000000..1da5d87e6
--- /dev/null
+++ b/rust/driver_manager/tests/connection_profile.rs
@@ -0,0 +1,571 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::path::PathBuf;
+
+use adbc_core::options::{AdbcVersion, OptionDatabase, OptionValue};
+use adbc_core::{error::Status, LOAD_FLAG_DEFAULT};
+use adbc_driver_manager::profile::{
+    ConnectionProfile, ConnectionProfileProvider, FilesystemProfileProvider,
+};
+use adbc_driver_manager::ManagedDatabase;
+
+mod common;
+
+fn write_profile_to_tempfile(profile_name: &str, content: &str) -> 
(tempfile::TempDir, PathBuf) {
+    let tmp_dir = tempfile::Builder::new()
+        .prefix("adbc_profile_tests")
+        .tempdir()
+        .expect("Failed to create temporary directory for profile tests");
+
+    let profile_path = tmp_dir.path().join(format!("{}.toml", profile_name));
+    std::fs::write(&profile_path, content).expect("Failed to write profile to 
temporary file");
+
+    (tmp_dir, profile_path)
+}
+
+fn simple_profile() -> String {
+    r#"
+version = 1
+driver = "adbc_driver_sqlite"
+
+[options]
+uri = ":memory:"
+"#
+    .to_string()
+}
+
+fn profile_with_nested_options() -> String {
+    r#"
+version = 1
+driver = "adbc_driver_sqlite"
+
+[options]
+uri = ":memory:"
+[options.connection]
+timeout = 30
+retry = true
+[options.connection.pool]
+max_size = 10
+min_size = 2
+idle_timeout = 300.5
+"#
+    .to_string()
+}
+
+fn profile_with_all_types() -> String {
+    r#"
+version = 1
+driver = "adbc_driver_sqlite"
+
+[options]
+uri = ":memory:"
+string_opt = "test_value"
+int_opt = 42
+float_opt = 12.34
+bool_opt = true
+"#
+    .to_string()
+}
+
+fn profile_without_driver() -> String {
+    r#"
+version = 1
+
+[options]
+uri = ":memory:"
+"#
+    .to_string()
+}
+
+fn profile_without_options() -> String {
+    r#"
+version = 1
+driver = "adbc_driver_sqlite"
+"#
+    .to_string()
+}
+
+fn profile_with_unsupported_version() -> String {
+    r#"
+version = 2
+driver = "adbc_driver_sqlite"
+
+[options]
+uri = ":memory:"
+"#
+    .to_string()
+}
+
+fn invalid_toml() -> &'static str {
+    r#"
+version = 1
+driver = "adbc_driver_sqlite"
+[options
+uri = ":memory:"
+"#
+}
+
+#[test]
+fn test_filesystem_profile_load_simple() {
+    let (tmp_dir, profile_path) = write_profile_to_tempfile("simple", 
&simple_profile());
+
+    let provider = FilesystemProfileProvider;
+    let profile = provider
+        .get_profile(
+            profile_path.to_str().unwrap(),
+            Some(vec![tmp_dir.path().to_path_buf()]),
+        )
+        .unwrap();
+
+    let (driver_name, init_func) = profile.get_driver_name().unwrap();
+    assert_eq!(driver_name, "adbc_driver_sqlite");
+    assert!(init_func.is_none());
+
+    let options: Vec<_> = profile.get_options().unwrap().into_iter().collect();
+    assert_eq!(options.len(), 1);
+
+    let (key, value) = &options[0];
+    assert_eq!(key, &OptionDatabase::Uri);
+    match value {
+        OptionValue::String(s) => assert_eq!(s, ":memory:"),
+        _ => panic!("Expected string value for uri option"),
+    }
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+fn test_filesystem_profile_nested_options() {
+    let (tmp_dir, profile_path) =
+        write_profile_to_tempfile("nested", &profile_with_nested_options());
+
+    let provider = FilesystemProfileProvider;
+    let profile = provider
+        .get_profile(
+            profile_path.to_str().unwrap(),
+            Some(vec![tmp_dir.path().to_path_buf()]),
+        )
+        .unwrap();
+
+    let options: Vec<_> = profile.get_options().unwrap().into_iter().collect();
+
+    // Should have uri, connection.timeout, connection.retry, 
connection.pool.max_size,
+    // connection.pool.min_size, connection.pool.idle_timeout
+    assert_eq!(options.len(), 6);
+
+    // Verify the nested key construction
+    let option_keys: Vec<String> = options
+        .iter()
+        .map(|(k, _)| k.as_ref().to_string())
+        .collect();
+
+    assert!(option_keys.contains(&"uri".to_string()));
+    assert!(option_keys.contains(&"connection.timeout".to_string()));
+    assert!(option_keys.contains(&"connection.retry".to_string()));
+    assert!(option_keys.contains(&"connection.pool.max_size".to_string()));
+    assert!(option_keys.contains(&"connection.pool.min_size".to_string()));
+    assert!(option_keys.contains(&"connection.pool.idle_timeout".to_string()));
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+fn test_filesystem_profile_all_option_types() {
+    let (tmp_dir, profile_path) = write_profile_to_tempfile("all_types", 
&profile_with_all_types());
+
+    let provider = FilesystemProfileProvider;
+    let profile = provider
+        .get_profile(
+            profile_path.to_str().unwrap(),
+            Some(vec![tmp_dir.path().to_path_buf()]),
+        )
+        .unwrap();
+
+    let options: Vec<_> = profile.get_options().unwrap().into_iter().collect();
+    assert_eq!(options.len(), 5);
+
+    for (key, value) in options {
+        match key.as_ref() {
+            "uri" => match value {
+                OptionValue::String(s) => assert_eq!(s, ":memory:"),
+                _ => panic!("Expected string value for uri"),
+            },
+            "string_opt" => match value {
+                OptionValue::String(s) => assert_eq!(s, "test_value"),
+                _ => panic!("Expected string value for string_opt"),
+            },
+            "int_opt" => match value {
+                OptionValue::Int(i) => assert_eq!(i, 42),
+                _ => panic!("Expected int value for int_opt"),
+            },
+            "float_opt" => match value {
+                OptionValue::Double(f) => assert!((f - 12.34).abs() < 1e-10),
+                _ => panic!("Expected double value for float_opt"),
+            },
+            "bool_opt" => match value {
+                OptionValue::String(s) => assert_eq!(s, "true"),
+                _ => panic!("Expected string value for bool_opt"),
+            },
+            _ => panic!("Unexpected option key: {}", key.as_ref()),
+        }
+    }
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+fn test_filesystem_profile_error_cases() {
+    let test_cases = vec![
+        (
+            "without options",
+            profile_without_options(),
+            Status::InvalidArguments,
+            "missing or invalid 'options' table in profile",
+        ),
+        (
+            "unsupported version",
+            profile_with_unsupported_version(),
+            Status::InvalidArguments,
+            "unsupported profile version",
+        ),
+        (
+            "invalid toml",
+            invalid_toml().to_string(),
+            Status::InvalidArguments,
+            "",
+        ),
+    ];
+
+    for (name, profile_content, expected_status, expected_msg_fragment) in 
test_cases {
+        let (tmp_dir, profile_path) = write_profile_to_tempfile(name, 
&profile_content);
+
+        let provider = FilesystemProfileProvider;
+        let result = provider.get_profile(
+            profile_path.to_str().unwrap(),
+            Some(vec![tmp_dir.path().to_path_buf()]),
+        );
+
+        assert!(result.is_err(), "Test case '{}': expected error", name);
+        let err = result.unwrap_err();
+        assert_eq!(
+            err.status, expected_status,
+            "Test case '{}': wrong status",
+            name
+        );
+        if !expected_msg_fragment.is_empty() {
+            assert!(
+                err.message.contains(expected_msg_fragment),
+                "Test case '{}': expected '{}' in error message, got '{}'",
+                name,
+                expected_msg_fragment,
+                err.message
+            );
+        }
+
+        tmp_dir
+            .close()
+            .expect("Failed to close/remove temporary directory");
+    }
+}
+
+#[test]
+fn test_filesystem_profile_not_found() {
+    let provider = FilesystemProfileProvider;
+    let result = provider.get_profile("nonexistent_profile", None);
+
+    assert!(result.is_err());
+    let err = result.unwrap_err();
+    assert_eq!(err.status, Status::NotFound);
+    assert!(err.message.contains("Profile not found"));
+}
+
+#[test]
+fn test_filesystem_profile_without_driver() {
+    let (tmp_dir, profile_path) = write_profile_to_tempfile("no_driver", 
&profile_without_driver());
+
+    let provider = FilesystemProfileProvider;
+    let profile = provider
+        .get_profile(
+            profile_path.to_str().unwrap(),
+            Some(vec![tmp_dir.path().to_path_buf()]),
+        )
+        .unwrap();
+
+    let (driver_name, _) = profile.get_driver_name().unwrap();
+    // Should get empty string for missing driver
+    assert_eq!(driver_name, "");
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+#[cfg_attr(not(feature = "driver_manager_test_lib"), ignore)]
+fn test_database_from_uri_with_profile() {
+    let (tmp_dir, profile_path) = write_profile_to_tempfile("test_db", 
&simple_profile());
+
+    let uri = format!("profile://{}", profile_path.display());
+    let database =
+        ManagedDatabase::from_uri(&uri, None, AdbcVersion::V100, 
LOAD_FLAG_DEFAULT, None).unwrap();
+
+    common::test_database(&database);
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+#[cfg_attr(not(feature = "driver_manager_test_lib"), ignore)]
+fn test_database_from_uri_with_profile_additional_options() {
+    let (tmp_dir, profile_path) = write_profile_to_tempfile("test_opts", 
&simple_profile());
+
+    let uri = format!("profile://{}", profile_path.display());
+
+    // Additional options should override profile options
+    let additional_opts = vec![(
+        OptionDatabase::Uri,
+        OptionValue::String("file::memory:".to_string()),
+    )];
+
+    let database = ManagedDatabase::from_uri_with_opts(
+        &uri,
+        None,
+        AdbcVersion::V100,
+        LOAD_FLAG_DEFAULT,
+        None,
+        additional_opts,
+    )
+    .unwrap();
+
+    common::test_database(&database);
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+fn test_profile_loading_scenarios() {
+    let test_cases = vec![
+        (
+            "search with additional paths",
+            "searchable",
+            simple_profile(),
+            true,
+            true,
+        ),
+        ("absolute path", "absolute", simple_profile(), false, true),
+    ];
+
+    for (name, profile_name, profile_content, use_search_path, use_absolute) 
in test_cases {
+        let (tmp_dir, profile_path) = write_profile_to_tempfile(profile_name, 
&profile_content);
+
+        let provider = FilesystemProfileProvider;
+        let search_paths = if use_search_path {
+            Some(vec![tmp_dir.path().to_path_buf()])
+        } else {
+            None
+        };
+
+        let profile_arg = if use_absolute {
+            profile_path.to_str().unwrap()
+        } else {
+            profile_name
+        };
+
+        let profile = provider
+            .get_profile(profile_arg, search_paths)
+            .unwrap_or_else(|e| panic!("Test case '{}' failed: {:?}", name, 
e));
+
+        let (driver_name, _) = profile.get_driver_name().unwrap();
+        assert_eq!(
+            driver_name, "adbc_driver_sqlite",
+            "Test case '{}': driver mismatch",
+            name
+        );
+
+        tmp_dir
+            .close()
+            .expect("Failed to close/remove temporary directory");
+    }
+}
+
+#[test]
+fn test_profile_display() {
+    let (tmp_dir, profile_path) = write_profile_to_tempfile("display", 
&simple_profile());
+
+    let provider = FilesystemProfileProvider;
+    let profile = provider
+        .get_profile(profile_path.to_str().unwrap(), None)
+        .unwrap();
+
+    let display_str = format!("{}", profile);
+    assert!(display_str.contains("FilesystemProfile"));
+    assert!(display_str.contains(profile_path.to_str().unwrap()));
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+fn test_profile_hierarchical_path_via_env_var() {
+    use std::env;
+
+    let tmp_dir = tempfile::Builder::new()
+        .prefix("adbc_profile_env_test")
+        .tempdir()
+        .expect("Failed to create temporary directory");
+
+    // Create a hierarchical directory structure: databases/postgres/
+    let databases_dir = tmp_dir.path().join("databases");
+    let postgres_dir = databases_dir.join("postgres");
+    std::fs::create_dir_all(&postgres_dir).expect("Failed to create 
subdirectories");
+
+    // Create a profile in the nested directory
+    let profile_path = postgres_dir.join("production.toml");
+    std::fs::write(&profile_path, simple_profile()).expect("Failed to write 
profile");
+
+    // Verify the file was created
+    assert!(
+        profile_path.is_file(),
+        "Profile file should exist at {}",
+        profile_path.display()
+    );
+
+    // Set ADBC_PROFILE_PATH to the parent directory
+    let prev_value = env::var_os("ADBC_PROFILE_PATH");
+    env::set_var("ADBC_PROFILE_PATH", tmp_dir.path());
+
+    // Verify the environment variable is set correctly
+    assert_eq!(
+        env::var_os("ADBC_PROFILE_PATH").as_deref(),
+        Some(tmp_dir.path().as_os_str())
+    );
+
+    // Try to load the profile using hierarchical relative path
+    let provider = FilesystemProfileProvider;
+    let result = provider.get_profile("databases/postgres/production", None);
+
+    // Restore the original environment variable
+    match prev_value {
+        Some(val) => env::set_var("ADBC_PROFILE_PATH", val),
+        None => env::remove_var("ADBC_PROFILE_PATH"),
+    }
+
+    // Verify the profile was loaded successfully
+    let profile = result.expect("Failed to load profile from hierarchical 
path");
+    let (driver_name, _) = profile.get_driver_name().unwrap();
+    assert_eq!(driver_name, "adbc_driver_sqlite");
+
+    // Verify it loaded from the correct path
+    let display_str = format!("{}", profile);
+    assert!(
+        display_str.contains(&profile_path.to_string_lossy().to_string()),
+        "display: {}, profile_path: {}",
+        display_str,
+        profile_path.to_string_lossy()
+    );
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+fn test_profile_hierarchical_path_with_extension_via_env_var() {
+    use std::env;
+
+    let tmp_dir = tempfile::Builder::new()
+        .prefix("adbc_profile_env_test2")
+        .tempdir()
+        .expect("Failed to create temporary directory");
+
+    // Create a hierarchical directory structure: configs/dev/
+    let configs_dir = tmp_dir.path().join("configs");
+    let dev_dir = configs_dir.join("dev");
+    std::fs::create_dir_all(&dev_dir).expect("Failed to create 
subdirectories");
+
+    // Create a profile in the nested directory
+    let profile_path = dev_dir.join("database.toml");
+    std::fs::write(&profile_path, simple_profile()).expect("Failed to write 
profile");
+
+    // Set ADBC_PROFILE_PATH to the parent directory
+    let prev_value = env::var_os("ADBC_PROFILE_PATH");
+    env::set_var("ADBC_PROFILE_PATH", tmp_dir.path());
+
+    // Try to load the profile using hierarchical relative path with .toml 
extension
+    let provider = FilesystemProfileProvider;
+    let result = provider.get_profile("configs/dev/database.toml", None);
+
+    // Restore the original environment variable
+    match prev_value {
+        Some(val) => env::set_var("ADBC_PROFILE_PATH", val),
+        None => env::remove_var("ADBC_PROFILE_PATH"),
+    }
+
+    // Verify the profile was loaded successfully
+    let profile = result.expect("Failed to load profile from hierarchical path 
with extension");
+    let (driver_name, _) = profile.get_driver_name().unwrap();
+    assert_eq!(driver_name, "adbc_driver_sqlite");
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+fn test_profile_hierarchical_path_additional_search_paths() {
+    let tmp_dir = tempfile::Builder::new()
+        .prefix("adbc_profile_hier_test")
+        .tempdir()
+        .expect("Failed to create temporary directory");
+
+    // Create a hierarchical directory structure: projects/myapp/
+    let projects_dir = tmp_dir.path().join("projects");
+    let myapp_dir = projects_dir.join("myapp");
+    std::fs::create_dir_all(&myapp_dir).expect("Failed to create 
subdirectories");
+
+    // Create a profile in the nested directory
+    let profile_path = myapp_dir.join("local.toml");
+    std::fs::write(&profile_path, simple_profile()).expect("Failed to write 
profile");
+
+    // Load profile using hierarchical path via additional_search_paths
+    let provider = FilesystemProfileProvider;
+    let result = provider.get_profile(
+        "projects/myapp/local",
+        Some(vec![tmp_dir.path().to_path_buf()]),
+    );
+
+    // Verify the profile was loaded successfully
+    let profile = result.expect("Failed to load profile from hierarchical 
path");
+    let (driver_name, _) = profile.get_driver_name().unwrap();
+    assert_eq!(driver_name, "adbc_driver_sqlite");
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
diff --git a/rust/driver_manager/tests/test_env_var_profiles.rs 
b/rust/driver_manager/tests/test_env_var_profiles.rs
new file mode 100644
index 000000000..04a972d41
--- /dev/null
+++ b/rust/driver_manager/tests/test_env_var_profiles.rs
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::env;
+use std::path::PathBuf;
+
+use adbc_core::options::AdbcVersion;
+use adbc_core::{error::Status, LOAD_FLAG_DEFAULT};
+use adbc_driver_manager::ManagedDatabase;
+
+fn write_profile_to_tempfile(tmp_dir: &tempfile::TempDir, name: &str, content: 
&str) -> PathBuf {
+    let profile_path = tmp_dir.path().join(format!("{}.toml", name));
+    std::fs::write(&profile_path, content).expect("Failed to write profile");
+    profile_path
+}
+
+#[test]
+#[cfg_attr(not(feature = "driver_manager_test_lib"), ignore)]
+fn test_env_var_replacement_basic() {
+    let tmp_dir = tempfile::Builder::new()
+        .prefix("adbc_env_var_test")
+        .tempdir()
+        .expect("Failed to create temporary directory");
+
+    // Set a test environment variable
+    let prev_value = env::var_os("ADBC_TEST_ENV_VAR");
+    env::set_var("ADBC_TEST_ENV_VAR", ":memory:");
+
+    let profile_content = r#"
+version = 1
+driver = "adbc_driver_sqlite"
+
+[options]
+uri = "{{ env_var(ADBC_TEST_ENV_VAR) }}"
+"#;
+
+    let profile_path = write_profile_to_tempfile(&tmp_dir, "test", 
profile_content);
+    let uri = format!("profile://{}", profile_path.display());
+
+    let result = ManagedDatabase::from_uri(&uri, None, AdbcVersion::V100, 
LOAD_FLAG_DEFAULT, None);
+
+    // Restore environment variable
+    match prev_value {
+        Some(val) => env::set_var("ADBC_TEST_ENV_VAR", val),
+        None => env::remove_var("ADBC_TEST_ENV_VAR"),
+    }
+
+    match result {
+        Ok(_db) => {
+            // Successfully created database with env_var replacement
+        }
+        Err(e) => {
+            panic!(
+                "Failed to create database with env_var replacement: {}",
+                e.message
+            );
+        }
+    }
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+#[cfg_attr(not(feature = "driver_manager_test_lib"), ignore)]
+fn test_env_var_replacement_empty() {
+    let tmp_dir = tempfile::Builder::new()
+        .prefix("adbc_env_var_test")
+        .tempdir()
+        .expect("Failed to create temporary directory");
+
+    // Make sure the env var doesn't exist
+    env::remove_var("ADBC_NONEXISTENT_VAR_12345");
+
+    let profile_content = r#"
+version = 1
+driver = "adbc_driver_sqlite"
+
+[options]
+uri = ":memory:"
+test_option = "{{ env_var(ADBC_NONEXISTENT_VAR_12345) }}"
+"#;
+
+    let profile_path = write_profile_to_tempfile(&tmp_dir, "test", 
profile_content);
+    let uri = format!("profile://{}", profile_path.display());
+
+    // Should succeed with empty string for undefined env var
+    let result = ManagedDatabase::from_uri(&uri, None, AdbcVersion::V100, 
LOAD_FLAG_DEFAULT, None);
+    assert!(result.is_err(), "Expected error for malformed env_var");
+    if let Err(err) = result {
+        assert_eq!(
+            err.message,
+            "[SQLite] Unknown database option test_option=''"
+        )
+    }
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+#[cfg_attr(not(feature = "driver_manager_test_lib"), ignore)]
+fn test_env_var_replacement_missing_closing_paren() {
+    let tmp_dir = tempfile::Builder::new()
+        .prefix("adbc_env_var_test")
+        .tempdir()
+        .expect("Failed to create temporary directory");
+
+    let profile_content = r#"
+version = 1
+driver = "adbc_driver_sqlite"
+
+[options]
+uri = ":memory:"
+test_option = "{{ env_var(SOME_VAR }}"
+"#;
+
+    let profile_path = write_profile_to_tempfile(&tmp_dir, "test", 
profile_content);
+    let uri = format!("profile://{}", profile_path.display());
+
+    let result = ManagedDatabase::from_uri(&uri, None, AdbcVersion::V100, 
LOAD_FLAG_DEFAULT, None);
+
+    assert!(result.is_err(), "Expected error for malformed env_var");
+    if let Err(err) = result {
+        assert_eq!(err.status, Status::InvalidArguments);
+        assert!(
+            err.message
+                .contains("invalid profile replacement expression"),
+            "Error message should mention invalid expression, got: {}",
+            err.message
+        );
+    }
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+#[cfg_attr(not(feature = "driver_manager_test_lib"), ignore)]
+fn test_env_var_replacement_missing_arg() {
+    let tmp_dir = tempfile::Builder::new()
+        .prefix("adbc_env_var_test")
+        .tempdir()
+        .expect("Failed to create temporary directory");
+
+    let profile_content = r#"
+version = 1
+driver = "adbc_driver_sqlite"
+
+[options]
+uri = ":memory:"
+test_option = "{{ env_var() }}"
+"#;
+
+    let profile_path = write_profile_to_tempfile(&tmp_dir, "test", 
profile_content);
+    let uri = format!("profile://{}", profile_path.display());
+
+    let result = ManagedDatabase::from_uri(&uri, None, AdbcVersion::V100, 
LOAD_FLAG_DEFAULT, None);
+
+    assert!(result.is_err(), "Expected error for empty env_var argument");
+    if let Err(err) = result {
+        assert_eq!(err.status, Status::InvalidArguments);
+        assert!(
+            err.message.contains("empty environment variable name"),
+            "Error message should mention empty variable name, got: {}",
+            err.message
+        );
+    }
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+#[cfg_attr(not(feature = "driver_manager_test_lib"), ignore)]
+fn test_env_var_replacement_interpolation() {
+    let tmp_dir = tempfile::Builder::new()
+        .prefix("adbc_env_var_test")
+        .tempdir()
+        .expect("Failed to create temporary directory");
+
+    // Set a test environment variable
+    let prev_value = env::var_os("ADBC_TEST_INTERPOLATE");
+    env::set_var("ADBC_TEST_INTERPOLATE", "middle_value");
+
+    let profile_content = r#"
+version = 1
+driver = "adbc_driver_sqlite"
+
+[options]
+uri = ":memory:"
+test_option = "prefix_{{ env_var(ADBC_TEST_INTERPOLATE) }}_suffix"
+"#;
+
+    let profile_path = write_profile_to_tempfile(&tmp_dir, "test", 
profile_content);
+    let uri = format!("profile://{}", profile_path.display());
+
+    let result = ManagedDatabase::from_uri(&uri, None, AdbcVersion::V100, 
LOAD_FLAG_DEFAULT, None);
+
+    // Restore environment variable
+    match prev_value {
+        Some(val) => env::set_var("ADBC_TEST_INTERPOLATE", val),
+        None => env::remove_var("ADBC_TEST_INTERPOLATE"),
+    }
+
+    assert!(result.is_err(), "Expected error for malformed env_var");
+    if let Err(err) = result {
+        assert_eq!(
+            err.message,
+            "[SQLite] Unknown database option 
test_option='prefix_middle_value_suffix'"
+        );
+    }
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+#[cfg_attr(not(feature = "driver_manager_test_lib"), ignore)]
+fn test_env_var_replacement_multiple() {
+    let tmp_dir = tempfile::Builder::new()
+        .prefix("adbc_env_var_test")
+        .tempdir()
+        .expect("Failed to create temporary directory");
+
+    // Set test environment variables
+    let prev_var1 = env::var_os("ADBC_TEST_VAR1");
+    let prev_var2 = env::var_os("ADBC_TEST_VAR2");
+    env::set_var("ADBC_TEST_VAR1", "first");
+    env::set_var("ADBC_TEST_VAR2", "second");
+
+    let profile_content = r#"
+version = 1
+driver = "adbc_driver_sqlite"
+
+[options]
+uri = ":memory:"
+test_option = "{{ env_var(ADBC_TEST_VAR1) }}_and_{{ env_var(ADBC_TEST_VAR2) }}"
+"#;
+
+    let profile_path = write_profile_to_tempfile(&tmp_dir, "test", 
profile_content);
+    let uri = format!("profile://{}", profile_path.display());
+
+    let result = ManagedDatabase::from_uri(&uri, None, AdbcVersion::V100, 
LOAD_FLAG_DEFAULT, None);
+
+    // Restore environment variables
+    match prev_var1 {
+        Some(val) => env::set_var("ADBC_TEST_VAR1", val),
+        None => env::remove_var("ADBC_TEST_VAR1"),
+    }
+    match prev_var2 {
+        Some(val) => env::set_var("ADBC_TEST_VAR2", val),
+        None => env::remove_var("ADBC_TEST_VAR2"),
+    }
+
+    assert!(result.is_err(), "Expected error for malformed env_var");
+    if let Err(err) = result {
+        assert_eq!(
+            err.message,
+            "[SQLite] Unknown database option test_option='first_and_second'"
+        );
+    }
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}
+
+#[test]
+#[cfg_attr(not(feature = "driver_manager_test_lib"), ignore)]
+fn test_env_var_replacement_whitespace() {
+    let tmp_dir = tempfile::Builder::new()
+        .prefix("adbc_env_var_test")
+        .tempdir()
+        .expect("Failed to create temporary directory");
+
+    // Set a test environment variable
+    let prev_value = env::var_os("ADBC_TEST_WHITESPACE");
+    env::set_var("ADBC_TEST_WHITESPACE", "value");
+
+    let profile_content = r#"
+version = 1
+driver = "adbc_driver_sqlite"
+
+[options]
+uri = ":memory:"
+test_option = "{{   env_var(  ADBC_TEST_WHITESPACE  )   }}"
+"#;
+
+    let profile_path = write_profile_to_tempfile(&tmp_dir, "test", 
profile_content);
+    let uri = format!("profile://{}", profile_path.display());
+
+    let result = ManagedDatabase::from_uri(&uri, None, AdbcVersion::V100, 
LOAD_FLAG_DEFAULT, None);
+
+    // Restore environment variable
+    match prev_value {
+        Some(val) => env::set_var("ADBC_TEST_WHITESPACE", val),
+        None => env::remove_var("ADBC_TEST_WHITESPACE"),
+    }
+
+    assert!(result.is_err(), "Expected error for malformed env_var");
+    if let Err(err) = result {
+        assert_eq!(
+            err.message,
+            "[SQLite] Unknown database option test_option='value'"
+        );
+    }
+
+    tmp_dir
+        .close()
+        .expect("Failed to close/remove temporary directory");
+}

Reply via email to