This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new bb5388b4 remove plugin subsystem ... (#1070)
bb5388b4 is described below
commit bb5388b425146fa57932a352ca74e96ac2c3d5f8
Author: Marko Milenković <[email protected]>
AuthorDate: Sat Oct 12 00:28:13 2024 +0100
remove plugin subsystem ... (#1070)
... as it was never finished
---
ballista/core/Cargo.toml | 1 -
ballista/core/src/lib.rs | 2 -
ballista/core/src/plugin/mod.rs | 128 ------------------------
ballista/core/src/plugin/plugin_manager.rs | 150 ----------------------------
ballista/core/src/plugin/udf.rs | 151 -----------------------------
5 files changed, 432 deletions(-)
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 8a01f56f..daabe8a2 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -60,7 +60,6 @@ futures = "0.3"
hashbrown = "0.14"
itertools = "0.12"
-libloading = "0.8.0"
log = "0.4"
md-5 = { version = "^0.10.0" }
object_store = { workspace = true }
diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs
index 301bc050..5306e8b9 100644
--- a/ballista/core/src/lib.rs
+++ b/ballista/core/src/lib.rs
@@ -31,8 +31,6 @@ pub mod error;
pub mod event_loop;
pub mod execution_plans;
pub mod object_store_registry;
-/// some plugins
-pub mod plugin;
pub mod utils;
#[macro_use]
diff --git a/ballista/core/src/plugin/mod.rs b/ballista/core/src/plugin/mod.rs
deleted file mode 100644
index 8d2cd6b9..00000000
--- a/ballista/core/src/plugin/mod.rs
+++ /dev/null
@@ -1,128 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::error::Result;
-use crate::plugin::udf::UDFPluginManager;
-use libloading::Library;
-use std::any::Any;
-use std::env;
-use std::sync::Arc;
-
-/// plugin manager
-pub mod plugin_manager;
-/// udf plugin
-pub mod udf;
-
-/// CARGO_PKG_VERSION
-pub static CORE_VERSION: &str = env!("CARGO_PKG_VERSION");
-/// RUSTC_VERSION
-pub static RUSTC_VERSION: &str = env!("RUSTC_VERSION");
-
-/// Top plugin trait
-pub trait Plugin {
- /// Returns the plugin as [`Any`](std::any::Any) so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
-}
-
-/// The enum of Plugin
-#[repr(C)]
-#[derive(PartialEq, std::cmp::Eq, std::hash::Hash, Copy, Clone)]
-pub enum PluginEnum {
- /// UDF/UDAF plugin
- UDF,
-}
-
-impl PluginEnum {
- /// new a struct which impl the PluginRegistrar trait
- pub fn init_plugin_manager(&self) -> Box<dyn PluginRegistrar> {
- match self {
- PluginEnum::UDF => Box::<UDFPluginManager>::default(),
- }
- }
-}
-
-/// Every plugin need a PluginDeclaration
-#[derive(Copy, Clone)]
-pub struct PluginDeclaration {
- /// Rust doesn’t have a stable ABI, meaning different compiler versions
can generate incompatible code.
- /// For these reasons, the UDF plug-in must be compiled using the same
version of rustc as datafusion.
- pub rustc_version: &'static str,
-
- /// core version of the plugin. The plugin's core_version need same as
plugin manager.
- pub core_version: &'static str,
-
- /// One of PluginEnum
- pub plugin_type: unsafe extern "C" fn() -> PluginEnum,
-}
-
-/// Plugin Registrar , Every plugin need implement this trait
-pub trait PluginRegistrar: Send + Sync + 'static {
- /// # Safety
- /// load plugin from library
- unsafe fn load(&mut self, library: Arc<Library>) -> Result<()>;
-
- /// Returns the plugin as [`Any`](std::any::Any) so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
-}
-
-/// Declare a plugin's PluginDeclaration.
-///
-/// # Notes
-///
-/// This works by automatically generating an `extern "C"` function named
`get_plugin_type` with a
-/// pre-defined signature and symbol name. And then generating a
PluginDeclaration.
-/// Therefore you will only be able to declare one plugin per library.
-#[macro_export]
-macro_rules! declare_plugin {
- ($plugin_type:expr) => {
- #[no_mangle]
- pub extern "C" fn get_plugin_type() -> $crate::plugin::PluginEnum {
- $plugin_type
- }
-
- #[no_mangle]
- pub static plugin_declaration: $crate::plugin::PluginDeclaration =
- $crate::plugin::PluginDeclaration {
- rustc_version: $crate::plugin::RUSTC_VERSION,
- core_version: $crate::plugin::CORE_VERSION,
- plugin_type: get_plugin_type,
- };
- };
-}
-
-/// get the plugin dir
-pub fn plugin_dir() -> String {
- let current_exe_dir = match env::current_exe() {
- Ok(exe_path) => exe_path.display().to_string(),
- Err(_e) => "".to_string(),
- };
-
- // If current_exe_dir contain `deps` the root dir is the parent dir
- // eg:
/Users/xxx/workspace/rust_plugin_sty/target/debug/deps/plugins_app-067452b3ff2af70e
- // the plugin dir is /Users/xxx/workspace/rust_plugin_sty/target/debug/deps
- // else eg: /Users/xxx/workspace/rust_plugin_sty/target/debug/plugins_app
- // the plugin dir is /Users/xxx/workspace/rust_plugin_sty/target/debug/
- if current_exe_dir.contains("/deps/") {
- let i = current_exe_dir.find("/deps/").unwrap();
- String::from(¤t_exe_dir.as_str()[..i + 6])
- } else {
- let i = current_exe_dir.rfind('/').unwrap();
- String::from(¤t_exe_dir.as_str()[..i])
- }
-}
diff --git a/ballista/core/src/plugin/plugin_manager.rs
b/ballista/core/src/plugin/plugin_manager.rs
deleted file mode 100644
index 6c19f054..00000000
--- a/ballista/core/src/plugin/plugin_manager.rs
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-use crate::error::{BallistaError, Result};
-use libloading::Library;
-use log::info;
-use std::collections::HashMap;
-use std::io;
-use std::sync::{Arc, Mutex};
-use walkdir::{DirEntry, WalkDir};
-
-use crate::plugin::{
- PluginDeclaration, PluginEnum, PluginRegistrar, CORE_VERSION,
RUSTC_VERSION,
-};
-use once_cell::sync::OnceCell;
-
-/// To prevent the library from being loaded multiple times, we use once_cell
defines a Arc<Mutex<GlobalPluginManager>>
-/// Because datafusion is a library, not a service, users may not need to load
all plug-ins in the process.
-/// So fn global_plugin_manager return Arc<Mutex<GlobalPluginManager>>. In
this way, users can load the required library through the load method of
GlobalPluginManager when needed
-static INSTANCE: OnceCell<Arc<Mutex<GlobalPluginManager>>> = OnceCell::new();
-
-/// global_plugin_manager
-pub fn global_plugin_manager(
- plugin_path: &str,
-) -> &'static Arc<Mutex<GlobalPluginManager>> {
- INSTANCE.get_or_init(move || unsafe {
- let mut gpm = GlobalPluginManager::default();
- gpm.load(plugin_path).unwrap();
- Arc::new(Mutex::new(gpm))
- })
-}
-
-#[derive(Default)]
-/// manager all plugin_type's plugin_manager
-pub struct GlobalPluginManager {
- /// every plugin need a plugin registrar
- pub plugin_managers: HashMap<PluginEnum, Box<dyn PluginRegistrar>>,
-
- /// loaded plugin files
- pub plugin_files: Vec<String>,
-}
-
-impl GlobalPluginManager {
- /// # Safety
- /// find plugin file from `plugin_path` and load it .
- unsafe fn load(&mut self, plugin_path: &str) -> Result<()> {
- if "".eq(plugin_path) {
- return Ok(());
- }
- // find library file from udaf_plugin_path
- info!("load plugin from dir:{}", plugin_path);
-
- let plugin_files = self.get_all_plugin_files(plugin_path)?;
-
- for plugin_file in plugin_files {
- let library = Library::new(plugin_file.path()).map_err(|e| {
- BallistaError::IoError(io::Error::new(
- io::ErrorKind::Other,
- format!("load library error: {e}"),
- ))
- })?;
-
- let library = Arc::new(library);
-
- let dec = library.get::<*mut
PluginDeclaration>(b"plugin_declaration\0");
- if dec.is_err() {
- info!(
- "not found plugin_declaration in the library: {}",
- plugin_file.path().to_str().unwrap()
- );
- continue;
- }
-
- let dec = dec.unwrap().read();
-
- // ersion checks to prevent accidental ABI incompatibilities
- if dec.rustc_version != RUSTC_VERSION || dec.core_version !=
CORE_VERSION {
- return Err(BallistaError::IoError(io::Error::new(
- io::ErrorKind::Other,
- "Version mismatch",
- )));
- }
-
- let plugin_enum = (dec.plugin_type)();
- let curr_plugin_manager = match
self.plugin_managers.get_mut(&plugin_enum) {
- None => {
- let plugin_manager = plugin_enum.init_plugin_manager();
- self.plugin_managers.insert(plugin_enum, plugin_manager);
- self.plugin_managers.get_mut(&plugin_enum).unwrap()
- }
- Some(manager) => manager,
- };
- curr_plugin_manager.load(library)?;
- self.plugin_files
- .push(plugin_file.path().to_str().unwrap().to_string());
- }
-
- Ok(())
- }
-
- /// get all plugin file in the dir
- fn get_all_plugin_files(&self, plugin_path: &str) ->
io::Result<Vec<DirEntry>> {
- let mut plugin_files = Vec::new();
- for entry in WalkDir::new(plugin_path).into_iter().filter_map(|e| {
- let item = e.unwrap();
- // every file only load once
- if self
- .plugin_files
- .contains(&item.path().to_str().unwrap().to_string())
- {
- return None;
- }
-
- let file_type = item.file_type();
- if !file_type.is_file() {
- return None;
- }
-
- if let Some(path) = item.path().extension() {
- if let Some(suffix) = path.to_str() {
- if suffix == "dylib" || suffix == "so" || suffix == "dll" {
- info!(
- "load plugin from library file:{}",
- item.path().to_str().unwrap()
- );
- return Some(item);
- }
- }
- }
-
- None
- }) {
- plugin_files.push(entry);
- }
- Ok(plugin_files)
- }
-}
diff --git a/ballista/core/src/plugin/udf.rs b/ballista/core/src/plugin/udf.rs
deleted file mode 100644
index 36ebd5e3..00000000
--- a/ballista/core/src/plugin/udf.rs
+++ /dev/null
@@ -1,151 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-use crate::error::{BallistaError, Result};
-use crate::plugin::plugin_manager::global_plugin_manager;
-use crate::plugin::{Plugin, PluginEnum, PluginRegistrar};
-use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
-use libloading::{Library, Symbol};
-use std::any::Any;
-use std::collections::HashMap;
-use std::io;
-use std::sync::Arc;
-
-/// UDF plugin trait
-pub trait UDFPlugin: Plugin {
- /// get a ScalarUDF by name
- fn get_scalar_udf_by_name(&self, fun_name: &str) -> Result<ScalarUDF>;
-
- /// return all udf names in the plugin
- fn udf_names(&self) -> Result<Vec<String>>;
-
- /// get a aggregate udf by name
- fn get_aggregate_udf_by_name(&self, fun_name: &str) ->
Result<AggregateUDF>;
-
- /// return all udaf names
- fn udaf_names(&self) -> Result<Vec<String>>;
-}
-
-/// UDFPluginManager
-#[derive(Default, Clone)]
-pub struct UDFPluginManager {
- /// scalar udfs
- pub scalar_udfs: HashMap<String, Arc<ScalarUDF>>,
-
- /// aggregate udfs
- pub aggregate_udfs: HashMap<String, Arc<AggregateUDF>>,
-
- /// All libraries load from the plugin dir.
- pub libraries: Vec<Arc<Library>>,
-}
-
-impl PluginRegistrar for UDFPluginManager {
- unsafe fn load(&mut self, library: Arc<Library>) -> Result<()> {
- type PluginRegister = unsafe fn() -> Box<dyn UDFPlugin>;
- let register_fun: Symbol<PluginRegister> =
- library.get(b"registrar_udf_plugin\0").map_err(|e| {
- BallistaError::IoError(io::Error::new(
- io::ErrorKind::Other,
- format!("not found fn registrar_udf_plugin in the library:
{e}"),
- ))
- })?;
-
- let udf_plugin: Box<dyn UDFPlugin> = register_fun();
- udf_plugin
- .udf_names()
- .unwrap()
- .iter()
- .try_for_each(|udf_name| {
- if self.scalar_udfs.contains_key(udf_name) {
- Err(BallistaError::IoError(io::Error::new(
- io::ErrorKind::Other,
- format!("udf name: {udf_name} already exists"),
- )))
- } else {
- let scalar_udf =
udf_plugin.get_scalar_udf_by_name(udf_name)?;
- self.scalar_udfs
- .insert(udf_name.to_string(), Arc::new(scalar_udf));
- Ok(())
- }
- })?;
-
- udf_plugin
- .udaf_names()
- .unwrap()
- .iter()
- .try_for_each(|udaf_name| {
- if self.aggregate_udfs.contains_key(udaf_name) {
- Err(BallistaError::IoError(io::Error::new(
- io::ErrorKind::Other,
- format!("udaf name: {udaf_name} already exists"),
- )))
- } else {
- let aggregate_udf =
- udf_plugin.get_aggregate_udf_by_name(udaf_name)?;
- self.aggregate_udfs
- .insert(udaf_name.to_string(),
Arc::new(aggregate_udf));
- Ok(())
- }
- })?;
- self.libraries.push(library);
- Ok(())
- }
-
- fn as_any(&self) -> &dyn Any {
- self
- }
-}
-
-/// Declare a udf plugin registrar callback
-///
-/// # Notes
-///
-/// This works by automatically generating an `extern "C"` function named
`registrar_udf_plugin` with a
-/// pre-defined signature and symbol name.
-/// Therefore you will only be able to declare one plugin per library.
-#[macro_export]
-macro_rules! declare_udf_plugin {
- ($curr_plugin_type:ty, $constructor:path) => {
- #[no_mangle]
- pub extern "C" fn registrar_udf_plugin() -> Box<dyn
$crate::plugin::udf::UDFPlugin> {
- // make sure the constructor is the correct type.
- let constructor: fn() -> $curr_plugin_type = $constructor;
- let object = constructor();
- Box::new(object)
- }
-
- $crate::declare_plugin!($crate::plugin::PluginEnum::UDF);
- };
-}
-
-/// get a Option of Immutable UDFPluginManager
-pub fn get_udf_plugin_manager(path: &str) -> Option<UDFPluginManager> {
- let udf_plugin_manager_opt = {
- let gpm = global_plugin_manager(path).lock().unwrap();
- let plugin_registrar_opt = gpm.plugin_managers.get(&PluginEnum::UDF);
- if let Some(plugin_registrar) = plugin_registrar_opt {
- if let Some(udf_plugin_manager) =
- plugin_registrar.as_any().downcast_ref::<UDFPluginManager>()
- {
- return Some(udf_plugin_manager.clone());
- } else {
- return None;
- }
- }
- None
- };
- udf_plugin_manager_opt
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]