This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch connectors_state in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 225d8227d159420c8cdf50cd278bc09881a39d13 Author: spetz <[email protected]> AuthorDate: Mon Jun 30 21:39:26 2025 +0200 feat(connectors): add connectors state storage using local files --- .gitignore | 1 + core/connectors/runtime/README.md | 5 +- core/connectors/runtime/config.toml | 3 + core/connectors/runtime/src/configs.rs | 9 ++ core/connectors/runtime/src/main.rs | 32 +++++-- core/connectors/runtime/src/source.rs | 66 +++++++++++--- core/connectors/runtime/src/state.rs | 105 +++++++++++++++++++++++ core/connectors/runtime/src/stream.rs | 61 +++++++------ core/connectors/sdk/src/lib.rs | 12 +++ core/connectors/sdk/src/source.rs | 33 +++++-- core/connectors/sources/random_source/src/lib.rs | 24 +++++- 11 files changed, 296 insertions(+), 55 deletions(-) diff --git a/.gitignore b/.gitignore index 0e722dca..0bde9d88 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ target* .DS_Store .gradle local_data* +local_state* /bench_* /sdk/errors_table/ /rust-clippy-results.sarif diff --git a/core/connectors/runtime/README.md b/core/connectors/runtime/README.md index ce5b2a82..592cbdfa 100644 --- a/core/connectors/runtime/README.md +++ b/core/connectors/runtime/README.md @@ -10,7 +10,7 @@ Internally, [dlopen2](https://github.com/OpenByteDev/dlopen2) provides a safe an By default, runtime will look for the configuration file, to decide which connectors to load and how to configure them. -The minimal viable configuration requires at least the Iggy credentials, to create 2 separate instances of producer & consumer connections. +The minimal viable configuration requires at least the Iggy credentials, to create 2 separate instances of producer & consumer connections and the state directory path where source connectors can store their optional state. ```toml [iggy] @@ -18,6 +18,9 @@ address = "localhost:8090" username = "iggy" password = "iggy" # token = "secret" # Personal Access Token (PAT) can be used instead of username and password + +[state] +path = "local_state" ``` All the other config sections start either with `sources` or `sinks` depending on the connector type. diff --git a/core/connectors/runtime/config.toml b/core/connectors/runtime/config.toml index 7f811ffc..ea868f8e 100644 --- a/core/connectors/runtime/config.toml +++ b/core/connectors/runtime/config.toml @@ -40,6 +40,9 @@ username = "iggy" password = "iggy" # token = "secret" # Personal Access Token (PAT) can be used instead of username and password +[state] +path = "local_state" + [sinks.stdout] enabled = true name = "Stdout sink" diff --git a/core/connectors/runtime/src/configs.rs b/core/connectors/runtime/src/configs.rs index 20a5dc1f..0c8d4d2b 100644 --- a/core/connectors/runtime/src/configs.rs +++ b/core/connectors/runtime/src/configs.rs @@ -44,6 +44,7 @@ pub struct RuntimeConfig { pub iggy: IggyConfig, pub sinks: HashMap<String, SinkConfig>, pub sources: HashMap<String, SourceConfig>, + pub state: StateConfig, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -61,6 +62,7 @@ pub struct SinkConfig { pub path: String, pub transforms: Option<TransformsConfig>, pub streams: Vec<StreamConsumerConfig>, + pub config_topic: Option<String>, pub config_format: Option<ConfigFormat>, pub config: Option<serde_json::Value>, } @@ -91,6 +93,8 @@ pub struct SourceConfig { pub path: String, pub transforms: Option<TransformsConfig>, pub streams: Vec<StreamProducerConfig>, + pub config_topic: Option<String>, + pub state_topic: Option<String>, pub config_format: Option<ConfigFormat>, pub config: Option<serde_json::Value>, } @@ -105,3 +109,8 @@ pub struct TransformsConfig { pub struct SharedTransformConfig { pub enabled: bool, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StateConfig { + pub path: String, +} diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index 9da57d52..0219d926 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -30,6 +30,7 @@ use iggy_connector_sdk::{ transforms::Transform, }; use mimalloc::MiMalloc; +use state::StateStorage; use std::{ collections::HashMap, env, @@ -45,6 +46,7 @@ pub(crate) mod error; mod manager; mod sink; mod source; +mod state; mod stream; mod transform; @@ -57,7 +59,13 @@ const ALLOWED_PLUGIN_EXTENSIONS: [&str; 3] = ["so", "dylib", "dll"]; #[derive(WrapperApi)] struct SourceApi { - open: extern "C" fn(id: u32, config_ptr: *const u8, config_len: usize) -> i32, + open: extern "C" fn( + id: u32, + config_ptr: *const u8, + config_len: usize, + state_ptr: *const u8, + state_len: usize, + ) -> i32, handle: extern "C" fn(id: u32, callback: SendCallback) -> i32, close: extern "C" fn(id: u32) -> i32, } @@ -113,9 +121,18 @@ async fn main() -> Result<(), RuntimeError> { .try_deserialize() .expect("Failed to deserialize runtime config"); - let (stream_consumer, stream_producer) = stream::init(config.iggy.clone()).await?; - let sources = source::init(config.sources.clone(), &stream_producer).await?; - let sinks = sink::init(config.sinks.clone(), &stream_consumer).await?; + std::fs::create_dir_all(&config.state.path).expect("Failed to create state directory"); + + info!("State will be stored in: {}", config.state.path); + + let iggy_clients = stream::init(config.iggy.clone()).await?; + let sources = source::init( + config.sources.clone(), + &iggy_clients.producer, + &config.state.path, + ) + .await?; + let sinks = sink::init(config.sinks.clone(), &iggy_clients.consumer).await?; let mut sink_wrappers = vec![]; let mut sink_with_plugins = HashMap::new(); @@ -194,14 +211,14 @@ async fn main() -> Result<(), RuntimeError> { } } - stream_producer.shutdown().await?; - stream_consumer.shutdown().await?; + iggy_clients.producer.shutdown().await?; + iggy_clients.consumer.shutdown().await?; info!("All connectors closed. Runtime shutdown complete."); Ok(()) } -pub fn resolve_plugin_path(path: &str) -> String { +pub(crate) fn resolve_plugin_path(path: &str) -> String { let extension = path.split('.').next_back().unwrap_or_default(); if ALLOWED_PLUGIN_EXTENSIONS.contains(&extension) { path.to_string() @@ -262,6 +279,7 @@ struct SourceConnectorPlugin { config_format: Option<ConfigFormat>, transforms: Vec<Arc<dyn Transform>>, producer: Option<SourceConnectorProducer>, + state_storage: StateStorage, } struct SourceConnectorProducer { diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index 0fd67baa..346fb150 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -23,7 +23,8 @@ use iggy::prelude::{ DirectConfig, HeaderKey, HeaderValue, IggyClient, IggyDuration, IggyError, IggyMessage, }; use iggy_connector_sdk::{ - DecodedMessage, Error, ProducedMessages, StreamEncoder, TopicMetadata, transforms::Transform, + ConnectorState, DecodedMessage, Error, ProducedMessages, StreamEncoder, TopicMetadata, + transforms::Transform, }; use once_cell::sync::Lazy; use std::{ @@ -35,7 +36,10 @@ use tracing::{debug, error, info, warn}; use crate::{ PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin, - SourceConnectorProducer, SourceConnectorWrapper, configs::SourceConfig, resolve_plugin_path, + SourceConnectorProducer, SourceConnectorWrapper, + configs::SourceConfig, + resolve_plugin_path, + state::{FileStateProvider, StateProvider, StateStorage}, transform, }; @@ -44,6 +48,7 @@ pub static SOURCE_SENDERS: Lazy<DashMap<u32, Sender<ProducedMessages>>> = Lazy:: pub async fn init( source_configs: HashMap<String, SourceConfig>, iggy_client: &IggyClient, + state_path: &str, ) -> Result<HashMap<String, SourceConnector>, RuntimeError> { let mut source_connectors: HashMap<String, SourceConnector> = HashMap::new(); for (key, config) in source_configs { @@ -56,12 +61,17 @@ pub async fn init( let plugin_id = PLUGIN_ID.load(Ordering::Relaxed); let path = resolve_plugin_path(&config.path); info!("Initializing source container with name: {name} ({key}), plugin: {path}",); + let state_storage = get_state_storage(state_path, &key); + let state = match &state_storage { + StateStorage::File(file) => file.load().await?, + }; if let Some(container) = source_connectors.get_mut(&path) { info!("Source container for plugin: {path} is already loaded.",); init_source( &container.container, &config.config.unwrap_or_default(), plugin_id, + state, ); container.plugins.push(SourceConnectorPlugin { id: plugin_id, @@ -71,12 +81,18 @@ pub async fn init( config_format: config.config_format, producer: None, transforms: vec![], + state_storage, }); } else { let container: Container<SourceApi> = unsafe { Container::load(&path).expect("Failed to load source container") }; info!("Source container for plugin: {path} loaded successfully.",); - init_source(&container, &config.config.unwrap_or_default(), plugin_id); + init_source( + &container, + &config.config.unwrap_or_default(), + plugin_id, + state, + ); source_connectors.insert( path.to_owned(), SourceConnector { @@ -89,6 +105,7 @@ pub async fn init( config_format: config.config_format, producer: None, transforms: vec![], + state_storage, }], }, ); @@ -149,9 +166,21 @@ pub async fn init( Ok(source_connectors) } -fn init_source(container: &Container<SourceApi>, config: &serde_json::Value, id: u32) { +fn init_source( + container: &Container<SourceApi>, + config: &serde_json::Value, + id: u32, + state: Option<ConnectorState>, +) { let config = serde_json::to_string(config).expect("Invalid source config."); - (container.open)(id, config.as_ptr(), config.len()); + let state_ptr = state.as_ref().map_or(std::ptr::null(), |s| s.0.as_ptr()); + let state_len = state.as_ref().map_or(0, |s| s.0.len()); + (container.open)(id, config.as_ptr(), config.len(), state_ptr, state_len); +} + +fn get_state_storage(state_path: &str, key: &str) -> StateStorage { + let path = format!("{state_path}/source_{key}.state"); + StateStorage::File(FileStateProvider::new(path)) } pub fn handle(sources: Vec<SourceConnectorWrapper>) { @@ -183,16 +212,16 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>) { topic: producer.topic().to_string(), }; - while let Ok(received_messages) = receiver.recv_async().await { - let count = received_messages.messages.len(); + while let Ok(produced_messages) = receiver.recv_async().await { + let count = produced_messages.messages.len(); info!("Source connector with ID: {plugin_id} received {count} messages",); - let schema = received_messages.schema; + let schema = produced_messages.schema; let mut messages: Vec<DecodedMessage> = Vec::with_capacity(count); - for message in received_messages.messages { + for message in produced_messages.messages { let Ok(payload) = schema.try_into_payload(message.payload) else { error!( "Failed to decode message payload with schema: {} for source connector with ID: {plugin_id}", - received_messages.schema + produced_messages.schema ); continue; }; @@ -241,6 +270,23 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>) { producer.stream(), producer.topic() ); + + let Some(state) = produced_messages.state else { + debug!("No state provided for source connector with ID: {plugin_id}"); + continue; + }; + + match &plugin.state_storage { + StateStorage::File(file) => { + if let Err(error) = file.save(state).await { + error!( + "Failed to save state for source connector with ID: {plugin_id}. {error}" + ); + continue; + } + debug!("State saved for source connector with ID: {plugin_id}"); + } + } } }); } diff --git a/core/connectors/runtime/src/state.rs b/core/connectors/runtime/src/state.rs new file mode 100644 index 00000000..b24f8892 --- /dev/null +++ b/core/connectors/runtime/src/state.rs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::SeekFrom; + +use iggy_connector_sdk::{ConnectorState, Error}; +use strum::Display; +use tokio::{ + fs::{File, OpenOptions}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, + sync::Mutex, +}; +use tracing::{debug, error, info}; + +pub trait StateProvider { + async fn load(&self) -> Result<Option<ConnectorState>, Error>; + async fn save(&self, state: ConnectorState) -> Result<(), Error>; +} + +#[derive(Debug, Display)] +pub enum StateStorage { + #[strum(to_string = "file")] + File(FileStateProvider), +} + +#[derive(Debug)] +pub struct FileStateProvider { + path: String, + file: Mutex<Option<File>>, +} + +impl FileStateProvider { + pub fn new(path: String) -> Self { + FileStateProvider { + path, + file: Mutex::new(None), + } + } +} + +impl StateProvider for FileStateProvider { + async fn load(&self) -> Result<Option<ConnectorState>, Error> { + let mut file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(&self.path) + .await + .map_err(|_| Error::CannotOpenStateFile)?; + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).await.map_err(|error| { + error!("Cannot read state file: {}. {error}.", self.path); + Error::CannotReadStateFile + })?; + self.file.lock().await.replace(file); + if buffer.is_empty() { + info!("State file is empty: {}", self.path); + Ok(None) + } else { + info!("Loaded state file: {}", self.path); + Ok(Some(ConnectorState(buffer))) + } + } + + async fn save(&self, state: ConnectorState) -> Result<(), Error> { + let mut file = self.file.lock().await; + let Some(file) = file.as_mut() else { + return Err(Error::CannotReadStateFile); + }; + + file.set_len(0).await.map_err(|error| { + error!("Cannot truncate state file: {}. {error}.", self.path); + Error::CannotWriteStateFile + })?; + + file.seek(SeekFrom::Start(0)).await.map_err(|error| { + error!("Cannot seek state file: {}. {error}.", self.path); + Error::CannotWriteStateFile + })?; + + file.write_all(&state.0).await.map_err(|error| { + error!("Cannot write state file: {}. {error}.", self.path); + Error::CannotWriteStateFile + })?; + + debug!("Saved state file: {}", self.path); + Ok(()) + } +} diff --git a/core/connectors/runtime/src/stream.rs b/core/connectors/runtime/src/stream.rs index f67afd31..3f62cf22 100644 --- a/core/connectors/runtime/src/stream.rs +++ b/core/connectors/runtime/src/stream.rs @@ -1,38 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use iggy::prelude::{Client, IggyClient, IggyClientBuilder}; use tracing::{error, info}; use crate::{configs::IggyConfig, error::RuntimeError}; -pub async fn init(config: IggyConfig) -> Result<(IggyClient, IggyClient), RuntimeError> { - let iggy_address = config.address; - let iggy_username = config.username; - let iggy_password = config.password; - let iggy_token = config.token; - let consumer_client = create_client( - &iggy_address, - iggy_username.as_deref(), - iggy_password.as_deref(), - iggy_token.as_deref(), - ) - .await?; - consumer_client.connect().await?; - let producer_client = create_client( - &iggy_address, - iggy_username.as_deref(), - iggy_password.as_deref(), - iggy_token.as_deref(), - ) - .await?; - producer_client.connect().await?; - Ok((consumer_client, producer_client)) +pub struct IggyClients { + pub producer: IggyClient, + pub consumer: IggyClient, +} + +pub async fn init(config: IggyConfig) -> Result<IggyClients, RuntimeError> { + let consumer = create_client(&config).await?; + let producer = create_client(&config).await?; + let iggy_clients = IggyClients { producer, consumer }; + Ok(iggy_clients) } -async fn create_client( - address: &str, - username: Option<&str>, - password: Option<&str>, - token: Option<&str>, -) -> Result<IggyClient, RuntimeError> { +async fn create_client(config: &IggyConfig) -> Result<IggyClient, RuntimeError> { + let address = config.address.to_owned(); + let username = config.username.to_owned(); + let password = config.password.to_owned(); + let token = config.token.to_owned(); + let connection_string = if let Some(token) = token { if token.is_empty() { error!("Iggy token cannot be empty (if username and password are not provided)"); diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs index 5b1468f6..07d7a7bc 100644 --- a/core/connectors/sdk/src/lib.rs +++ b/core/connectors/sdk/src/lib.rs @@ -49,6 +49,9 @@ pub fn get_runtime() -> &'static Runtime { RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio runtime")) } +#[derive(Debug, Serialize, Deserialize)] +pub struct ConnectorState(pub Vec<u8>); + /// The Source trait defines the interface for a source connector, responsible for producing the messages to the configured stream and topic. /// Once the messages are produced (e.g. fetched from an external API), they will be sent further to the specified destination. #[async_trait] @@ -208,6 +211,7 @@ pub struct ReceivedMessage { pub struct ProducedMessages { pub schema: Schema, pub messages: Vec<ProducedMessage>, + pub state: Option<ConnectorState>, } #[repr(C)] @@ -288,4 +292,12 @@ pub enum Error { CannotDecode(Schema), #[error("Invalid protobuf payload.")] InvalidProtobufPayload, + #[error("Cannot open state file")] + CannotOpenStateFile, + #[error("Cannot read state file")] + CannotReadStateFile, + #[error("Cannot write state file")] + CannotWriteStateFile, + #[error("Invalid state")] + InvalidState, } diff --git a/core/connectors/sdk/src/source.rs b/core/connectors/sdk/src/source.rs index 5b569260..a989de5e 100644 --- a/core/connectors/sdk/src/source.rs +++ b/core/connectors/sdk/src/source.rs @@ -16,7 +16,7 @@ * under the License. */ -use crate::{Error, Source, get_runtime}; +use crate::{ConnectorState, Error, Source, get_runtime}; use serde::de::DeserializeOwned; use std::sync::Arc; use tokio::{sync::watch, task::JoinHandle}; @@ -61,10 +61,12 @@ impl<T: Source + std::fmt::Debug + 'static> SourceContainer<T> { id: u32, config_ptr: *const u8, config_len: usize, + state_ptr: *const u8, + state_len: usize, factory: F, ) -> i32 where - F: FnOnce(u32, C) -> T, + F: FnOnce(u32, C, Option<ConnectorState>) -> T, C: DeserializeOwned, { unsafe { @@ -80,7 +82,15 @@ impl<T: Source + std::fmt::Debug + 'static> SourceContainer<T> { return -1; }; - let mut source = factory(id, config); + let state = if state_ptr.is_null() { + None + } else { + let state = std::slice::from_raw_parts(state_ptr, state_len); + let state = ConnectorState(state.to_vec()); + Some(state) + }; + + let mut source = factory(id, config, state); let runtime = get_runtime(); let result = runtime.block_on(source.open()); self.id = id; @@ -200,9 +210,22 @@ macro_rules! source_connector { #[cfg(not(test))] #[unsafe(no_mangle)] - unsafe extern "C" fn open(id: u32, config_ptr: *const u8, config_len: usize) -> i32 { + unsafe extern "C" fn open( + id: u32, + config_ptr: *const u8, + config_len: usize, + state_ptr: *const u8, + state_len: usize, + ) -> i32 { let mut container = SourceContainer::new(id); - let result = container.open(id, config_ptr, config_len, <$type>::new); + let result = container.open( + id, + config_ptr, + config_len, + state_ptr, + state_len, + <$type>::new, + ); INSTANCES.insert(id, container); result } diff --git a/core/connectors/sources/random_source/src/lib.rs b/core/connectors/sources/random_source/src/lib.rs index d92f7b74..927a76c9 100644 --- a/core/connectors/sources/random_source/src/lib.rs +++ b/core/connectors/sources/random_source/src/lib.rs @@ -20,7 +20,7 @@ use std::{str::FromStr, time::Duration}; use async_trait::async_trait; use iggy_connector_sdk::{ - Error, ProducedMessage, ProducedMessages, Schema, Source, source_connector, + ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source, source_connector, }; use rand::{ Rng, @@ -57,17 +57,31 @@ struct State { } impl RandomSource { - pub fn new(id: u32, config: RandomSourceConfig) -> Self { + pub fn new(id: u32, config: RandomSourceConfig, state: Option<ConnectorState>) -> Self { let interval = config.interval.unwrap_or("1s".to_string()); let interval = humantime::Duration::from_str(&interval) - .unwrap_or(humantime::Duration::from_str("1s").unwrap()); + .unwrap_or(humantime::Duration::from_str("1s").expect("Failed to parse interval")); + + let current_number = if let Some(state) = state { + u64::from_le_bytes( + state.0[0..8] + .try_into() + .inspect_err(|error| { + error!("Failed to convert state to u64. {error}"); + }) + .unwrap_or_default(), + ) + } else { + 0 + } as usize; + RandomSource { id, max_count: config.max_count, interval: *interval, messages_range: config.messages_range.unwrap_or((10, 50)), payload_size: config.payload_size.unwrap_or(100), - state: Mutex::new(State { current_number: 0 }), + state: Mutex::new(State { current_number }), } } @@ -140,6 +154,7 @@ impl Source for RandomSource { return Ok(ProducedMessages { schema: Schema::Json, messages: vec![], + state: Some(ConnectorState(state.current_number.to_le_bytes().to_vec())), }); } } @@ -154,6 +169,7 @@ impl Source for RandomSource { Ok(ProducedMessages { schema: Schema::Json, messages, + state: Some(ConnectorState(state.current_number.to_le_bytes().to_vec())), }) }
