This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new bb5388b4 remove plugin subsystem ... (#1070)
bb5388b4 is described below

commit bb5388b425146fa57932a352ca74e96ac2c3d5f8
Author: Marko Milenković <[email protected]>
AuthorDate: Sat Oct 12 00:28:13 2024 +0100

    remove plugin subsystem ... (#1070)
    
    ... as it was never finished
---
 ballista/core/Cargo.toml                   |   1 -
 ballista/core/src/lib.rs                   |   2 -
 ballista/core/src/plugin/mod.rs            | 128 ------------------------
 ballista/core/src/plugin/plugin_manager.rs | 150 ----------------------------
 ballista/core/src/plugin/udf.rs            | 151 -----------------------------
 5 files changed, 432 deletions(-)

diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 8a01f56f..daabe8a2 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -60,7 +60,6 @@ futures = "0.3"
 hashbrown = "0.14"
 
 itertools = "0.12"
-libloading = "0.8.0"
 log = "0.4"
 md-5 = { version = "^0.10.0" }
 object_store = { workspace = true }
diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs
index 301bc050..5306e8b9 100644
--- a/ballista/core/src/lib.rs
+++ b/ballista/core/src/lib.rs
@@ -31,8 +31,6 @@ pub mod error;
 pub mod event_loop;
 pub mod execution_plans;
 pub mod object_store_registry;
-/// some plugins
-pub mod plugin;
 pub mod utils;
 
 #[macro_use]
diff --git a/ballista/core/src/plugin/mod.rs b/ballista/core/src/plugin/mod.rs
deleted file mode 100644
index 8d2cd6b9..00000000
--- a/ballista/core/src/plugin/mod.rs
+++ /dev/null
@@ -1,128 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::error::Result;
-use crate::plugin::udf::UDFPluginManager;
-use libloading::Library;
-use std::any::Any;
-use std::env;
-use std::sync::Arc;
-
-/// plugin manager
-pub mod plugin_manager;
-/// udf plugin
-pub mod udf;
-
-/// CARGO_PKG_VERSION
-pub static CORE_VERSION: &str = env!("CARGO_PKG_VERSION");
-/// RUSTC_VERSION
-pub static RUSTC_VERSION: &str = env!("RUSTC_VERSION");
-
-/// Top plugin trait
-pub trait Plugin {
-    /// Returns the plugin as [`Any`](std::any::Any) so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-}
-
-/// The enum of Plugin
-#[repr(C)]
-#[derive(PartialEq, std::cmp::Eq, std::hash::Hash, Copy, Clone)]
-pub enum PluginEnum {
-    /// UDF/UDAF plugin
-    UDF,
-}
-
-impl PluginEnum {
-    /// new a struct which impl the PluginRegistrar trait
-    pub fn init_plugin_manager(&self) -> Box<dyn PluginRegistrar> {
-        match self {
-            PluginEnum::UDF => Box::<UDFPluginManager>::default(),
-        }
-    }
-}
-
-/// Every plugin need a PluginDeclaration
-#[derive(Copy, Clone)]
-pub struct PluginDeclaration {
-    /// Rust doesn’t have a stable ABI, meaning different compiler versions 
can generate incompatible code.
-    /// For these reasons, the UDF plug-in must be compiled using the same 
version of rustc as datafusion.
-    pub rustc_version: &'static str,
-
-    /// core version of the plugin. The plugin's core_version need same as 
plugin manager.
-    pub core_version: &'static str,
-
-    /// One of PluginEnum
-    pub plugin_type: unsafe extern "C" fn() -> PluginEnum,
-}
-
-/// Plugin Registrar , Every plugin need implement this trait
-pub trait PluginRegistrar: Send + Sync + 'static {
-    /// # Safety
-    /// load plugin from library
-    unsafe fn load(&mut self, library: Arc<Library>) -> Result<()>;
-
-    /// Returns the plugin as [`Any`](std::any::Any) so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-}
-
-/// Declare a plugin's PluginDeclaration.
-///
-/// # Notes
-///
-/// This works by automatically generating an `extern "C"` function named 
`get_plugin_type` with a
-/// pre-defined signature and symbol name. And then generating a 
PluginDeclaration.
-/// Therefore you will only be able to declare one plugin per library.
-#[macro_export]
-macro_rules! declare_plugin {
-    ($plugin_type:expr) => {
-        #[no_mangle]
-        pub extern "C" fn get_plugin_type() -> $crate::plugin::PluginEnum {
-            $plugin_type
-        }
-
-        #[no_mangle]
-        pub static plugin_declaration: $crate::plugin::PluginDeclaration =
-            $crate::plugin::PluginDeclaration {
-                rustc_version: $crate::plugin::RUSTC_VERSION,
-                core_version: $crate::plugin::CORE_VERSION,
-                plugin_type: get_plugin_type,
-            };
-    };
-}
-
-/// get the plugin dir
-pub fn plugin_dir() -> String {
-    let current_exe_dir = match env::current_exe() {
-        Ok(exe_path) => exe_path.display().to_string(),
-        Err(_e) => "".to_string(),
-    };
-
-    // If current_exe_dir contain `deps` the root dir is the parent dir
-    // eg: 
/Users/xxx/workspace/rust_plugin_sty/target/debug/deps/plugins_app-067452b3ff2af70e
-    // the plugin dir is /Users/xxx/workspace/rust_plugin_sty/target/debug/deps
-    // else eg: /Users/xxx/workspace/rust_plugin_sty/target/debug/plugins_app
-    // the plugin dir is /Users/xxx/workspace/rust_plugin_sty/target/debug/
-    if current_exe_dir.contains("/deps/") {
-        let i = current_exe_dir.find("/deps/").unwrap();
-        String::from(&current_exe_dir.as_str()[..i + 6])
-    } else {
-        let i = current_exe_dir.rfind('/').unwrap();
-        String::from(&current_exe_dir.as_str()[..i])
-    }
-}
diff --git a/ballista/core/src/plugin/plugin_manager.rs 
b/ballista/core/src/plugin/plugin_manager.rs
deleted file mode 100644
index 6c19f054..00000000
--- a/ballista/core/src/plugin/plugin_manager.rs
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-use crate::error::{BallistaError, Result};
-use libloading::Library;
-use log::info;
-use std::collections::HashMap;
-use std::io;
-use std::sync::{Arc, Mutex};
-use walkdir::{DirEntry, WalkDir};
-
-use crate::plugin::{
-    PluginDeclaration, PluginEnum, PluginRegistrar, CORE_VERSION, 
RUSTC_VERSION,
-};
-use once_cell::sync::OnceCell;
-
-/// To prevent the library from being loaded multiple times, we use once_cell 
defines a Arc<Mutex<GlobalPluginManager>>
-/// Because datafusion is a library, not a service, users may not need to load 
all plug-ins in the process.
-/// So fn global_plugin_manager return Arc<Mutex<GlobalPluginManager>>. In 
this way, users can load the required library through the load method of 
GlobalPluginManager when needed
-static INSTANCE: OnceCell<Arc<Mutex<GlobalPluginManager>>> = OnceCell::new();
-
-/// global_plugin_manager
-pub fn global_plugin_manager(
-    plugin_path: &str,
-) -> &'static Arc<Mutex<GlobalPluginManager>> {
-    INSTANCE.get_or_init(move || unsafe {
-        let mut gpm = GlobalPluginManager::default();
-        gpm.load(plugin_path).unwrap();
-        Arc::new(Mutex::new(gpm))
-    })
-}
-
-#[derive(Default)]
-/// manager all plugin_type's plugin_manager
-pub struct GlobalPluginManager {
-    /// every plugin need a plugin registrar
-    pub plugin_managers: HashMap<PluginEnum, Box<dyn PluginRegistrar>>,
-
-    /// loaded plugin files
-    pub plugin_files: Vec<String>,
-}
-
-impl GlobalPluginManager {
-    /// # Safety
-    /// find plugin file from `plugin_path` and load it .
-    unsafe fn load(&mut self, plugin_path: &str) -> Result<()> {
-        if "".eq(plugin_path) {
-            return Ok(());
-        }
-        // find library file from udaf_plugin_path
-        info!("load plugin from dir:{}", plugin_path);
-
-        let plugin_files = self.get_all_plugin_files(plugin_path)?;
-
-        for plugin_file in plugin_files {
-            let library = Library::new(plugin_file.path()).map_err(|e| {
-                BallistaError::IoError(io::Error::new(
-                    io::ErrorKind::Other,
-                    format!("load library error: {e}"),
-                ))
-            })?;
-
-            let library = Arc::new(library);
-
-            let dec = library.get::<*mut 
PluginDeclaration>(b"plugin_declaration\0");
-            if dec.is_err() {
-                info!(
-                    "not found plugin_declaration in the library: {}",
-                    plugin_file.path().to_str().unwrap()
-                );
-                continue;
-            }
-
-            let dec = dec.unwrap().read();
-
-            // ersion checks to prevent accidental ABI incompatibilities
-            if dec.rustc_version != RUSTC_VERSION || dec.core_version != 
CORE_VERSION {
-                return Err(BallistaError::IoError(io::Error::new(
-                    io::ErrorKind::Other,
-                    "Version mismatch",
-                )));
-            }
-
-            let plugin_enum = (dec.plugin_type)();
-            let curr_plugin_manager = match 
self.plugin_managers.get_mut(&plugin_enum) {
-                None => {
-                    let plugin_manager = plugin_enum.init_plugin_manager();
-                    self.plugin_managers.insert(plugin_enum, plugin_manager);
-                    self.plugin_managers.get_mut(&plugin_enum).unwrap()
-                }
-                Some(manager) => manager,
-            };
-            curr_plugin_manager.load(library)?;
-            self.plugin_files
-                .push(plugin_file.path().to_str().unwrap().to_string());
-        }
-
-        Ok(())
-    }
-
-    /// get all plugin file in the dir
-    fn get_all_plugin_files(&self, plugin_path: &str) -> 
io::Result<Vec<DirEntry>> {
-        let mut plugin_files = Vec::new();
-        for entry in WalkDir::new(plugin_path).into_iter().filter_map(|e| {
-            let item = e.unwrap();
-            // every file only load once
-            if self
-                .plugin_files
-                .contains(&item.path().to_str().unwrap().to_string())
-            {
-                return None;
-            }
-
-            let file_type = item.file_type();
-            if !file_type.is_file() {
-                return None;
-            }
-
-            if let Some(path) = item.path().extension() {
-                if let Some(suffix) = path.to_str() {
-                    if suffix == "dylib" || suffix == "so" || suffix == "dll" {
-                        info!(
-                            "load plugin from library file:{}",
-                            item.path().to_str().unwrap()
-                        );
-                        return Some(item);
-                    }
-                }
-            }
-
-            None
-        }) {
-            plugin_files.push(entry);
-        }
-        Ok(plugin_files)
-    }
-}
diff --git a/ballista/core/src/plugin/udf.rs b/ballista/core/src/plugin/udf.rs
deleted file mode 100644
index 36ebd5e3..00000000
--- a/ballista/core/src/plugin/udf.rs
+++ /dev/null
@@ -1,151 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-use crate::error::{BallistaError, Result};
-use crate::plugin::plugin_manager::global_plugin_manager;
-use crate::plugin::{Plugin, PluginEnum, PluginRegistrar};
-use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
-use libloading::{Library, Symbol};
-use std::any::Any;
-use std::collections::HashMap;
-use std::io;
-use std::sync::Arc;
-
-/// UDF plugin trait
-pub trait UDFPlugin: Plugin {
-    /// get a ScalarUDF by name
-    fn get_scalar_udf_by_name(&self, fun_name: &str) -> Result<ScalarUDF>;
-
-    /// return all udf names in the plugin
-    fn udf_names(&self) -> Result<Vec<String>>;
-
-    /// get a aggregate udf by name
-    fn get_aggregate_udf_by_name(&self, fun_name: &str) -> 
Result<AggregateUDF>;
-
-    /// return all udaf names
-    fn udaf_names(&self) -> Result<Vec<String>>;
-}
-
-/// UDFPluginManager
-#[derive(Default, Clone)]
-pub struct UDFPluginManager {
-    /// scalar udfs
-    pub scalar_udfs: HashMap<String, Arc<ScalarUDF>>,
-
-    /// aggregate udfs
-    pub aggregate_udfs: HashMap<String, Arc<AggregateUDF>>,
-
-    /// All libraries load from the plugin dir.
-    pub libraries: Vec<Arc<Library>>,
-}
-
-impl PluginRegistrar for UDFPluginManager {
-    unsafe fn load(&mut self, library: Arc<Library>) -> Result<()> {
-        type PluginRegister = unsafe fn() -> Box<dyn UDFPlugin>;
-        let register_fun: Symbol<PluginRegister> =
-            library.get(b"registrar_udf_plugin\0").map_err(|e| {
-                BallistaError::IoError(io::Error::new(
-                    io::ErrorKind::Other,
-                    format!("not found fn registrar_udf_plugin in the library: 
{e}"),
-                ))
-            })?;
-
-        let udf_plugin: Box<dyn UDFPlugin> = register_fun();
-        udf_plugin
-            .udf_names()
-            .unwrap()
-            .iter()
-            .try_for_each(|udf_name| {
-                if self.scalar_udfs.contains_key(udf_name) {
-                    Err(BallistaError::IoError(io::Error::new(
-                        io::ErrorKind::Other,
-                        format!("udf name: {udf_name} already exists"),
-                    )))
-                } else {
-                    let scalar_udf = 
udf_plugin.get_scalar_udf_by_name(udf_name)?;
-                    self.scalar_udfs
-                        .insert(udf_name.to_string(), Arc::new(scalar_udf));
-                    Ok(())
-                }
-            })?;
-
-        udf_plugin
-            .udaf_names()
-            .unwrap()
-            .iter()
-            .try_for_each(|udaf_name| {
-                if self.aggregate_udfs.contains_key(udaf_name) {
-                    Err(BallistaError::IoError(io::Error::new(
-                        io::ErrorKind::Other,
-                        format!("udaf name: {udaf_name} already exists"),
-                    )))
-                } else {
-                    let aggregate_udf =
-                        udf_plugin.get_aggregate_udf_by_name(udaf_name)?;
-                    self.aggregate_udfs
-                        .insert(udaf_name.to_string(), 
Arc::new(aggregate_udf));
-                    Ok(())
-                }
-            })?;
-        self.libraries.push(library);
-        Ok(())
-    }
-
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-}
-
-/// Declare a udf plugin registrar callback
-///
-/// # Notes
-///
-/// This works by automatically generating an `extern "C"` function named 
`registrar_udf_plugin` with a
-/// pre-defined signature and symbol name.
-/// Therefore you will only be able to declare one plugin per library.
-#[macro_export]
-macro_rules! declare_udf_plugin {
-    ($curr_plugin_type:ty, $constructor:path) => {
-        #[no_mangle]
-        pub extern "C" fn registrar_udf_plugin() -> Box<dyn 
$crate::plugin::udf::UDFPlugin> {
-            // make sure the constructor is the correct type.
-            let constructor: fn() -> $curr_plugin_type = $constructor;
-            let object = constructor();
-            Box::new(object)
-        }
-
-        $crate::declare_plugin!($crate::plugin::PluginEnum::UDF);
-    };
-}
-
-/// get a Option of Immutable UDFPluginManager
-pub fn get_udf_plugin_manager(path: &str) -> Option<UDFPluginManager> {
-    let udf_plugin_manager_opt = {
-        let gpm = global_plugin_manager(path).lock().unwrap();
-        let plugin_registrar_opt = gpm.plugin_managers.get(&PluginEnum::UDF);
-        if let Some(plugin_registrar) = plugin_registrar_opt {
-            if let Some(udf_plugin_manager) =
-                plugin_registrar.as_any().downcast_ref::<UDFPluginManager>()
-            {
-                return Some(udf_plugin_manager.clone());
-            } else {
-                return None;
-            }
-        }
-        None
-    };
-    udf_plugin_manager_opt
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to