This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch iceberg-fix
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c2e8973aefcace279a8db0d7c6231074785265ce
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Feb 10 13:12:03 2026 +0100

    fix(connectors): prevent crash when created() is unsupported, improve solib 
path resolve
    
    Fall back to modified() when created() is unsupported (XFS,
    Overlay2). Replace .expect() panics with error propagation.
    Improve plugin shared library resolution: search binary dir,
    cwd, /usr/lib, /usr/lib64, /lib with actionable diagnostics.
    
    Fixes #2712
---
 .../src/configs/connectors/local_provider.rs       |  27 +++-
 core/connectors/runtime/src/main.rs                | 155 +++++++++++++++++++--
 core/connectors/runtime/src/sink.rs                |   2 +-
 core/connectors/runtime/src/source.rs              |   2 +-
 4 files changed, 171 insertions(+), 15 deletions(-)

diff --git a/core/connectors/runtime/src/configs/connectors/local_provider.rs 
b/core/connectors/runtime/src/configs/connectors/local_provider.rs
index 801eff73a..5080d796d 100644
--- a/core/connectors/runtime/src/configs/connectors/local_provider.rs
+++ b/core/connectors/runtime/src/configs/connectors/local_provider.rs
@@ -198,15 +198,34 @@ impl LocalConnectorsConfigProvider<Created> {
                 debug!("Loaded base configuration: {:?}", base_config);
                 let path = path
                     .to_str()
-                    .expect("Failed to convert connector configuration path to 
string")
+                    .ok_or_else(|| {
+                        RuntimeError::InvalidConfiguration(format!(
+                            "Non-UTF8 connector config path: {}",
+                            path.display()
+                        ))
+                    })?
                     .to_string();
                 let connector_config: ConnectorConfig =
                     Self::create_file_config_provider(path.clone(), 
&base_config)
                         .load_config()
                         .await
-                        .expect("Failed to load connector configuration");
-
-                let created_at: DateTime<Utc> = 
entry.metadata()?.created()?.into();
+                        .map_err(|e| {
+                            RuntimeError::InvalidConfiguration(format!(
+                                "Failed to load connector configuration from 
'{path}': {e}"
+                            ))
+                        })?;
+
+                let metadata = entry.metadata()?;
+                let created_at: DateTime<Utc> = metadata
+                    .created()
+                    .or_else(|_| metadata.modified())
+                    .map(Into::into)
+                    .unwrap_or_else(|_| {
+                        warn!(
+                            "Could not read created or modified time for 
'{path}', using current time",
+                        );
+                        Utc::now()
+                    });
                 let connector_id: ConnectorId = (&connector_config).into();
                 let version = connector_config.version();
 
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index 5d55be603..52a853c96 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -66,7 +66,7 @@ static GLOBAL: MiMalloc = MiMalloc;
 struct Args {}
 
 static PLUGIN_ID: AtomicU32 = AtomicU32::new(1);
-const ALLOWED_PLUGIN_EXTENSIONS: [&str; 3] = ["so", "dylib", "dll"];
+const ALLOWED_PLUGIN_EXTENSIONS: [&str; 2] = ["so", "dylib"];
 const DEFAULT_CONFIG_PATH: &str = "core/connectors/runtime/config.toml";
 
 #[derive(WrapperApi)]
@@ -266,21 +266,96 @@ async fn main() -> Result<(), RuntimeError> {
     Ok(())
 }
 
-pub(crate) fn resolve_plugin_path(path: &str) -> String {
-    let extension = path.split('.').next_back().unwrap_or_default();
-    if ALLOWED_PLUGIN_EXTENSIONS.contains(&extension) {
+/// Resolves a plugin shared library path from the connector config `path` 
field.
+///
+/// Accepts both `plugin.so` and `plugin` (OS-specific extension appended if 
missing).
+/// For absolute paths, checks existence at the literal location.
+/// For relative paths, searches in order:
+///   1. Literal path (relative to cwd)
+///   2. Next to the runtime binary
+///   3. Current working directory
+///   4. /usr/lib
+///   5. /usr/lib64
+///   6. /lib
+pub(crate) fn resolve_plugin_path(path: &str) -> Result<String, RuntimeError> {
+    let extension = std::path::Path::new(path)
+        .extension()
+        .and_then(|e| e.to_str())
+        .unwrap_or_default();
+    let with_extension = if ALLOWED_PLUGIN_EXTENSIONS.contains(&extension) {
         path.to_string()
     } else {
-        let os = std::env::consts::OS;
-        let os_extension = match os {
-            "windows" => "dll",
+        let os_extension = match std::env::consts::OS {
             "macos" => "dylib",
             _ => "so",
         };
-
-        debug!("Resolved plugin path: {path}.{os_extension} for detected OS: 
{os}");
         format!("{path}.{os_extension}")
+    };
+
+    let candidate = std::path::Path::new(&with_extension);
+
+    if candidate.exists() {
+        debug!("Resolved plugin path: {with_extension}");
+        return Ok(with_extension);
+    }
+
+    if candidate.is_relative() {
+        let Some(file_name) = candidate.file_name() else {
+            return Err(RuntimeError::InvalidConfiguration(format!(
+                "Invalid plugin path: '{with_extension}'"
+            )));
+        };
+
+        let search_dirs: Vec<std::path::PathBuf> = [
+            std::env::current_exe()
+                .ok()
+                .and_then(|p| p.parent().map(|d| d.to_path_buf())),
+            std::env::current_dir().ok(),
+            Some(std::path::PathBuf::from("/usr/lib")),
+            Some(std::path::PathBuf::from("/usr/lib64")),
+            Some(std::path::PathBuf::from("/lib")),
+        ]
+        .into_iter()
+        .flatten()
+        .collect();
+
+        for dir in &search_dirs {
+            let full = dir.join(file_name);
+            if full.exists() {
+                let resolved = match full.to_str() {
+                    Some(s) => s.to_owned(),
+                    None => continue,
+                };
+                debug!(
+                    "Resolved plugin path: {resolved} (found in {})",
+                    dir.display()
+                );
+                return Ok(resolved);
+            }
+        }
+
+        let searched: Vec<String> = std::iter::once(with_extension.clone())
+            .chain(search_dirs.iter().filter_map(|d| {
+                let full = d.join(file_name);
+                full.to_str().map(|s| s.to_owned())
+            }))
+            .collect();
+
+        return Err(RuntimeError::InvalidConfiguration(format!(
+            "Plugin library not found. Searched paths:\n{}\n\
+             Ensure the shared library (.so/.dylib) is built and placed in one 
of these locations.",
+            searched
+                .iter()
+                .map(|p| format!("  - {p}"))
+                .collect::<Vec<_>>()
+                .join("\n")
+        )));
     }
+
+    Err(RuntimeError::InvalidConfiguration(format!(
+        "Plugin library not found at '{with_extension}'. \
+         Ensure the shared library file exists at this path."
+    )))
 }
 
 struct SinkConnector {
@@ -350,3 +425,65 @@ struct SourceConnectorWrapper {
     callback: HandleCallback,
     plugins: Vec<SourceConnectorPlugin>,
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::fs;
+    use tempfile::TempDir;
+
+    #[test]
+    fn path_with_known_extension_is_preserved() {
+        let result = resolve_plugin_path("/tmp/nonexistent_test_plugin.so");
+        let err = result.unwrap_err().to_string();
+        assert!(
+            err.contains("nonexistent_test_plugin.so"),
+            "Error should mention the .so path, got: {err}"
+        );
+    }
+
+    #[test]
+    fn path_without_extension_gets_os_suffix() {
+        let result = resolve_plugin_path("/tmp/nonexistent_test_plugin");
+        let err = result.unwrap_err().to_string();
+        let expected_ext = match std::env::consts::OS {
+            "macos" => "dylib",
+            _ => "so",
+        };
+        assert!(
+            err.contains(&format!("nonexistent_test_plugin.{expected_ext}")),
+            "Error should mention OS-specific extension, got: {err}"
+        );
+    }
+
+    #[test]
+    fn nonexistent_relative_path_lists_searched_locations() {
+        let result = resolve_plugin_path("nonexistent_test_plugin.so");
+        let err = result.unwrap_err().to_string();
+        assert!(
+            err.contains("Searched paths:"),
+            "Should list searched paths, got: {err}"
+        );
+    }
+
+    #[test]
+    fn nonexistent_absolute_path_returns_specific_error() {
+        let result = resolve_plugin_path("/no/such/dir/plugin.so");
+        let err = result.unwrap_err().to_string();
+        assert!(
+            err.contains("/no/such/dir/plugin.so"),
+            "Should mention the exact path, got: {err}"
+        );
+    }
+
+    #[test]
+    fn existing_file_resolves_directly() {
+        let dir = TempDir::new().unwrap();
+        let plugin_path = dir.path().join("test_plugin.so");
+        fs::write(&plugin_path, b"fake-plugin").unwrap();
+
+        let result = resolve_plugin_path(plugin_path.to_str().unwrap())
+            .expect("should resolve existing file");
+        assert_eq!(result, plugin_path.to_str().unwrap());
+    }
+}
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index 092e41680..fb53cdb50 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -57,7 +57,7 @@ pub async fn init(
         }
 
         let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst);
-        let path = resolve_plugin_path(&config.path);
+        let path = resolve_plugin_path(&config.path)?;
         info!(
             "Initializing sink container with name: {name} ({key}), config 
version: {}, plugin: {path}",
             &config.version
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index b665a1310..889932ecb 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -67,7 +67,7 @@ pub async fn init(
         }
 
         let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst);
-        let path = resolve_plugin_path(&config.path);
+        let path = resolve_plugin_path(&config.path)?;
         info!(
             "Initializing source container with name: {name} ({key}), config 
version: {}, plugin: {path}",
             &config.version

Reply via email to