This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_state
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 225d8227d159420c8cdf50cd278bc09881a39d13
Author: spetz <[email protected]>
AuthorDate: Mon Jun 30 21:39:26 2025 +0200

    feat(connectors): add connectors state storage using local files
---
 .gitignore                                       |   1 +
 core/connectors/runtime/README.md                |   5 +-
 core/connectors/runtime/config.toml              |   3 +
 core/connectors/runtime/src/configs.rs           |   9 ++
 core/connectors/runtime/src/main.rs              |  32 +++++--
 core/connectors/runtime/src/source.rs            |  66 +++++++++++---
 core/connectors/runtime/src/state.rs             | 105 +++++++++++++++++++++++
 core/connectors/runtime/src/stream.rs            |  61 +++++++------
 core/connectors/sdk/src/lib.rs                   |  12 +++
 core/connectors/sdk/src/source.rs                |  33 +++++--
 core/connectors/sources/random_source/src/lib.rs |  24 +++++-
 11 files changed, 296 insertions(+), 55 deletions(-)

diff --git a/.gitignore b/.gitignore
index 0e722dca..0bde9d88 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@ target*
 .DS_Store
 .gradle
 local_data*
+local_state*
 /bench_*
 /sdk/errors_table/
 /rust-clippy-results.sarif
diff --git a/core/connectors/runtime/README.md 
b/core/connectors/runtime/README.md
index ce5b2a82..592cbdfa 100644
--- a/core/connectors/runtime/README.md
+++ b/core/connectors/runtime/README.md
@@ -10,7 +10,7 @@ Internally, [dlopen2](https://github.com/OpenByteDev/dlopen2) 
provides a safe an
 
 By default, runtime will look for the configuration file, to decide which 
connectors to load and how to configure them.
 
-The minimal viable configuration requires at least the Iggy credentials, to 
create 2 separate instances of producer & consumer connections.
+The minimal viable configuration requires at least the Iggy credentials, to 
create 2 separate instances of producer & consumer connections and the state 
directory path where source connectors can store their optional state.
 
 ```toml
 [iggy]
@@ -18,6 +18,9 @@ address = "localhost:8090"
 username = "iggy"
 password = "iggy"
 # token = "secret" # Personal Access Token (PAT) can be used instead of 
username and password
+
+[state]
+path = "local_state"
 ```
 
 All the other config sections start either with `sources` or `sinks` depending 
on the connector type.
diff --git a/core/connectors/runtime/config.toml 
b/core/connectors/runtime/config.toml
index 7f811ffc..ea868f8e 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -40,6 +40,9 @@ username = "iggy"
 password = "iggy"
 # token = "secret" # Personal Access Token (PAT) can be used instead of 
username and password
 
+[state]
+path = "local_state"
+
 [sinks.stdout]
 enabled = true
 name = "Stdout sink"
diff --git a/core/connectors/runtime/src/configs.rs 
b/core/connectors/runtime/src/configs.rs
index 20a5dc1f..0c8d4d2b 100644
--- a/core/connectors/runtime/src/configs.rs
+++ b/core/connectors/runtime/src/configs.rs
@@ -44,6 +44,7 @@ pub struct RuntimeConfig {
     pub iggy: IggyConfig,
     pub sinks: HashMap<String, SinkConfig>,
     pub sources: HashMap<String, SourceConfig>,
+    pub state: StateConfig,
 }
 
 #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -61,6 +62,7 @@ pub struct SinkConfig {
     pub path: String,
     pub transforms: Option<TransformsConfig>,
     pub streams: Vec<StreamConsumerConfig>,
+    pub config_topic: Option<String>,
     pub config_format: Option<ConfigFormat>,
     pub config: Option<serde_json::Value>,
 }
@@ -91,6 +93,8 @@ pub struct SourceConfig {
     pub path: String,
     pub transforms: Option<TransformsConfig>,
     pub streams: Vec<StreamProducerConfig>,
+    pub config_topic: Option<String>,
+    pub state_topic: Option<String>,
     pub config_format: Option<ConfigFormat>,
     pub config: Option<serde_json::Value>,
 }
@@ -105,3 +109,8 @@ pub struct TransformsConfig {
 pub struct SharedTransformConfig {
     pub enabled: bool,
 }
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StateConfig {
+    pub path: String,
+}
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index 9da57d52..0219d926 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -30,6 +30,7 @@ use iggy_connector_sdk::{
     transforms::Transform,
 };
 use mimalloc::MiMalloc;
+use state::StateStorage;
 use std::{
     collections::HashMap,
     env,
@@ -45,6 +46,7 @@ pub(crate) mod error;
 mod manager;
 mod sink;
 mod source;
+mod state;
 mod stream;
 mod transform;
 
@@ -57,7 +59,13 @@ const ALLOWED_PLUGIN_EXTENSIONS: [&str; 3] = ["so", "dylib", 
"dll"];
 
 #[derive(WrapperApi)]
 struct SourceApi {
-    open: extern "C" fn(id: u32, config_ptr: *const u8, config_len: usize) -> 
i32,
+    open: extern "C" fn(
+        id: u32,
+        config_ptr: *const u8,
+        config_len: usize,
+        state_ptr: *const u8,
+        state_len: usize,
+    ) -> i32,
     handle: extern "C" fn(id: u32, callback: SendCallback) -> i32,
     close: extern "C" fn(id: u32) -> i32,
 }
@@ -113,9 +121,18 @@ async fn main() -> Result<(), RuntimeError> {
         .try_deserialize()
         .expect("Failed to deserialize runtime config");
 
-    let (stream_consumer, stream_producer) = 
stream::init(config.iggy.clone()).await?;
-    let sources = source::init(config.sources.clone(), 
&stream_producer).await?;
-    let sinks = sink::init(config.sinks.clone(), &stream_consumer).await?;
+    std::fs::create_dir_all(&config.state.path).expect("Failed to create state 
directory");
+
+    info!("State will be stored in: {}", config.state.path);
+
+    let iggy_clients = stream::init(config.iggy.clone()).await?;
+    let sources = source::init(
+        config.sources.clone(),
+        &iggy_clients.producer,
+        &config.state.path,
+    )
+    .await?;
+    let sinks = sink::init(config.sinks.clone(), 
&iggy_clients.consumer).await?;
 
     let mut sink_wrappers = vec![];
     let mut sink_with_plugins = HashMap::new();
@@ -194,14 +211,14 @@ async fn main() -> Result<(), RuntimeError> {
         }
     }
 
-    stream_producer.shutdown().await?;
-    stream_consumer.shutdown().await?;
+    iggy_clients.producer.shutdown().await?;
+    iggy_clients.consumer.shutdown().await?;
 
     info!("All connectors closed. Runtime shutdown complete.");
     Ok(())
 }
 
-pub fn resolve_plugin_path(path: &str) -> String {
+pub(crate) fn resolve_plugin_path(path: &str) -> String {
     let extension = path.split('.').next_back().unwrap_or_default();
     if ALLOWED_PLUGIN_EXTENSIONS.contains(&extension) {
         path.to_string()
@@ -262,6 +279,7 @@ struct SourceConnectorPlugin {
     config_format: Option<ConfigFormat>,
     transforms: Vec<Arc<dyn Transform>>,
     producer: Option<SourceConnectorProducer>,
+    state_storage: StateStorage,
 }
 
 struct SourceConnectorProducer {
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index 0fd67baa..346fb150 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -23,7 +23,8 @@ use iggy::prelude::{
     DirectConfig, HeaderKey, HeaderValue, IggyClient, IggyDuration, IggyError, 
IggyMessage,
 };
 use iggy_connector_sdk::{
-    DecodedMessage, Error, ProducedMessages, StreamEncoder, TopicMetadata, 
transforms::Transform,
+    ConnectorState, DecodedMessage, Error, ProducedMessages, StreamEncoder, 
TopicMetadata,
+    transforms::Transform,
 };
 use once_cell::sync::Lazy;
 use std::{
@@ -35,7 +36,10 @@ use tracing::{debug, error, info, warn};
 
 use crate::{
     PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin,
-    SourceConnectorProducer, SourceConnectorWrapper, configs::SourceConfig, 
resolve_plugin_path,
+    SourceConnectorProducer, SourceConnectorWrapper,
+    configs::SourceConfig,
+    resolve_plugin_path,
+    state::{FileStateProvider, StateProvider, StateStorage},
     transform,
 };
 
@@ -44,6 +48,7 @@ pub static SOURCE_SENDERS: Lazy<DashMap<u32, 
Sender<ProducedMessages>>> = Lazy::
 pub async fn init(
     source_configs: HashMap<String, SourceConfig>,
     iggy_client: &IggyClient,
+    state_path: &str,
 ) -> Result<HashMap<String, SourceConnector>, RuntimeError> {
     let mut source_connectors: HashMap<String, SourceConnector> = 
HashMap::new();
     for (key, config) in source_configs {
@@ -56,12 +61,17 @@ pub async fn init(
         let plugin_id = PLUGIN_ID.load(Ordering::Relaxed);
         let path = resolve_plugin_path(&config.path);
         info!("Initializing source container with name: {name} ({key}), 
plugin: {path}",);
+        let state_storage = get_state_storage(state_path, &key);
+        let state = match &state_storage {
+            StateStorage::File(file) => file.load().await?,
+        };
         if let Some(container) = source_connectors.get_mut(&path) {
             info!("Source container for plugin: {path} is already loaded.",);
             init_source(
                 &container.container,
                 &config.config.unwrap_or_default(),
                 plugin_id,
+                state,
             );
             container.plugins.push(SourceConnectorPlugin {
                 id: plugin_id,
@@ -71,12 +81,18 @@ pub async fn init(
                 config_format: config.config_format,
                 producer: None,
                 transforms: vec![],
+                state_storage,
             });
         } else {
             let container: Container<SourceApi> =
                 unsafe { Container::load(&path).expect("Failed to load source 
container") };
             info!("Source container for plugin: {path} loaded successfully.",);
-            init_source(&container, &config.config.unwrap_or_default(), 
plugin_id);
+            init_source(
+                &container,
+                &config.config.unwrap_or_default(),
+                plugin_id,
+                state,
+            );
             source_connectors.insert(
                 path.to_owned(),
                 SourceConnector {
@@ -89,6 +105,7 @@ pub async fn init(
                         config_format: config.config_format,
                         producer: None,
                         transforms: vec![],
+                        state_storage,
                     }],
                 },
             );
@@ -149,9 +166,21 @@ pub async fn init(
     Ok(source_connectors)
 }
 
-fn init_source(container: &Container<SourceApi>, config: &serde_json::Value, 
id: u32) {
+fn init_source(
+    container: &Container<SourceApi>,
+    config: &serde_json::Value,
+    id: u32,
+    state: Option<ConnectorState>,
+) {
     let config = serde_json::to_string(config).expect("Invalid source 
config.");
-    (container.open)(id, config.as_ptr(), config.len());
+    let state_ptr = state.as_ref().map_or(std::ptr::null(), |s| s.0.as_ptr());
+    let state_len = state.as_ref().map_or(0, |s| s.0.len());
+    (container.open)(id, config.as_ptr(), config.len(), state_ptr, state_len);
+}
+
+fn get_state_storage(state_path: &str, key: &str) -> StateStorage {
+    let path = format!("{state_path}/source_{key}.state");
+    StateStorage::File(FileStateProvider::new(path))
 }
 
 pub fn handle(sources: Vec<SourceConnectorWrapper>) {
@@ -183,16 +212,16 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>) {
                     topic: producer.topic().to_string(),
                 };
 
-                while let Ok(received_messages) = receiver.recv_async().await {
-                    let count = received_messages.messages.len();
+                while let Ok(produced_messages) = receiver.recv_async().await {
+                    let count = produced_messages.messages.len();
                     info!("Source connector with ID: {plugin_id} received 
{count} messages",);
-                    let schema = received_messages.schema;
+                    let schema = produced_messages.schema;
                     let mut messages: Vec<DecodedMessage> = 
Vec::with_capacity(count);
-                    for message in received_messages.messages {
+                    for message in produced_messages.messages {
                         let Ok(payload) = 
schema.try_into_payload(message.payload) else {
                             error!(
                                 "Failed to decode message payload with schema: 
{} for source connector with ID: {plugin_id}",
-                                received_messages.schema
+                                produced_messages.schema
                             );
                             continue;
                         };
@@ -241,6 +270,23 @@ pub fn handle(sources: Vec<SourceConnectorWrapper>) {
                         producer.stream(),
                         producer.topic()
                     );
+
+                    let Some(state) = produced_messages.state else {
+                        debug!("No state provided for source connector with 
ID: {plugin_id}");
+                        continue;
+                    };
+
+                    match &plugin.state_storage {
+                        StateStorage::File(file) => {
+                            if let Err(error) = file.save(state).await {
+                                error!(
+                                    "Failed to save state for source connector 
with ID: {plugin_id}. {error}"
+                                );
+                                continue;
+                            }
+                            debug!("State saved for source connector with ID: 
{plugin_id}");
+                        }
+                    }
                 }
             });
         }
diff --git a/core/connectors/runtime/src/state.rs 
b/core/connectors/runtime/src/state.rs
new file mode 100644
index 00000000..b24f8892
--- /dev/null
+++ b/core/connectors/runtime/src/state.rs
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::io::SeekFrom;
+
+use iggy_connector_sdk::{ConnectorState, Error};
+use strum::Display;
+use tokio::{
+    fs::{File, OpenOptions},
+    io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
+    sync::Mutex,
+};
+use tracing::{debug, error, info};
+
+pub trait StateProvider {
+    async fn load(&self) -> Result<Option<ConnectorState>, Error>;
+    async fn save(&self, state: ConnectorState) -> Result<(), Error>;
+}
+
+#[derive(Debug, Display)]
+pub enum StateStorage {
+    #[strum(to_string = "file")]
+    File(FileStateProvider),
+}
+
+#[derive(Debug)]
+pub struct FileStateProvider {
+    path: String,
+    file: Mutex<Option<File>>,
+}
+
+impl FileStateProvider {
+    pub fn new(path: String) -> Self {
+        FileStateProvider {
+            path,
+            file: Mutex::new(None),
+        }
+    }
+}
+
+impl StateProvider for FileStateProvider {
+    async fn load(&self) -> Result<Option<ConnectorState>, Error> {
+        let mut file = OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create(true)
+            .truncate(false)
+            .open(&self.path)
+            .await
+            .map_err(|_| Error::CannotOpenStateFile)?;
+
+        let mut buffer = Vec::new();
+        file.read_to_end(&mut buffer).await.map_err(|error| {
+            error!("Cannot read state file: {}. {error}.", self.path);
+            Error::CannotReadStateFile
+        })?;
+        self.file.lock().await.replace(file);
+        if buffer.is_empty() {
+            info!("State file is empty: {}", self.path);
+            Ok(None)
+        } else {
+            info!("Loaded state file: {}", self.path);
+            Ok(Some(ConnectorState(buffer)))
+        }
+    }
+
+    async fn save(&self, state: ConnectorState) -> Result<(), Error> {
+        let mut file = self.file.lock().await;
+        let Some(file) = file.as_mut() else {
+            return Err(Error::CannotReadStateFile);
+        };
+
+        file.set_len(0).await.map_err(|error| {
+            error!("Cannot truncate state file: {}. {error}.", self.path);
+            Error::CannotWriteStateFile
+        })?;
+
+        file.seek(SeekFrom::Start(0)).await.map_err(|error| {
+            error!("Cannot seek state file: {}. {error}.", self.path);
+            Error::CannotWriteStateFile
+        })?;
+
+        file.write_all(&state.0).await.map_err(|error| {
+            error!("Cannot write state file: {}. {error}.", self.path);
+            Error::CannotWriteStateFile
+        })?;
+
+        debug!("Saved state file: {}", self.path);
+        Ok(())
+    }
+}
diff --git a/core/connectors/runtime/src/stream.rs 
b/core/connectors/runtime/src/stream.rs
index f67afd31..3f62cf22 100644
--- a/core/connectors/runtime/src/stream.rs
+++ b/core/connectors/runtime/src/stream.rs
@@ -1,38 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
 use iggy::prelude::{Client, IggyClient, IggyClientBuilder};
 use tracing::{error, info};
 
 use crate::{configs::IggyConfig, error::RuntimeError};
 
-pub async fn init(config: IggyConfig) -> Result<(IggyClient, IggyClient), 
RuntimeError> {
-    let iggy_address = config.address;
-    let iggy_username = config.username;
-    let iggy_password = config.password;
-    let iggy_token = config.token;
-    let consumer_client = create_client(
-        &iggy_address,
-        iggy_username.as_deref(),
-        iggy_password.as_deref(),
-        iggy_token.as_deref(),
-    )
-    .await?;
-    consumer_client.connect().await?;
-    let producer_client = create_client(
-        &iggy_address,
-        iggy_username.as_deref(),
-        iggy_password.as_deref(),
-        iggy_token.as_deref(),
-    )
-    .await?;
-    producer_client.connect().await?;
-    Ok((consumer_client, producer_client))
+pub struct IggyClients {
+    pub producer: IggyClient,
+    pub consumer: IggyClient,
+}
+
+pub async fn init(config: IggyConfig) -> Result<IggyClients, RuntimeError> {
+    let consumer = create_client(&config).await?;
+    let producer = create_client(&config).await?;
+    let iggy_clients = IggyClients { producer, consumer };
+    Ok(iggy_clients)
 }
 
-async fn create_client(
-    address: &str,
-    username: Option<&str>,
-    password: Option<&str>,
-    token: Option<&str>,
-) -> Result<IggyClient, RuntimeError> {
+async fn create_client(config: &IggyConfig) -> Result<IggyClient, 
RuntimeError> {
+    let address = config.address.to_owned();
+    let username = config.username.to_owned();
+    let password = config.password.to_owned();
+    let token = config.token.to_owned();
+
     let connection_string = if let Some(token) = token {
         if token.is_empty() {
             error!("Iggy token cannot be empty (if username and password are 
not provided)");
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
index 5b1468f6..07d7a7bc 100644
--- a/core/connectors/sdk/src/lib.rs
+++ b/core/connectors/sdk/src/lib.rs
@@ -49,6 +49,9 @@ pub fn get_runtime() -> &'static Runtime {
     RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio 
runtime"))
 }
 
+#[derive(Debug, Serialize, Deserialize)]
+pub struct ConnectorState(pub Vec<u8>);
+
 /// The Source trait defines the interface for a source connector, responsible 
for producing the messages to the configured stream and topic.
 /// Once the messages are produced (e.g. fetched from an external API), they 
will be sent further to the specified destination.
 #[async_trait]
@@ -208,6 +211,7 @@ pub struct ReceivedMessage {
 pub struct ProducedMessages {
     pub schema: Schema,
     pub messages: Vec<ProducedMessage>,
+    pub state: Option<ConnectorState>,
 }
 
 #[repr(C)]
@@ -288,4 +292,12 @@ pub enum Error {
     CannotDecode(Schema),
     #[error("Invalid protobuf payload.")]
     InvalidProtobufPayload,
+    #[error("Cannot open state file")]
+    CannotOpenStateFile,
+    #[error("Cannot read state file")]
+    CannotReadStateFile,
+    #[error("Cannot write state file")]
+    CannotWriteStateFile,
+    #[error("Invalid state")]
+    InvalidState,
 }
diff --git a/core/connectors/sdk/src/source.rs 
b/core/connectors/sdk/src/source.rs
index 5b569260..a989de5e 100644
--- a/core/connectors/sdk/src/source.rs
+++ b/core/connectors/sdk/src/source.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::{Error, Source, get_runtime};
+use crate::{ConnectorState, Error, Source, get_runtime};
 use serde::de::DeserializeOwned;
 use std::sync::Arc;
 use tokio::{sync::watch, task::JoinHandle};
@@ -61,10 +61,12 @@ impl<T: Source + std::fmt::Debug + 'static> 
SourceContainer<T> {
         id: u32,
         config_ptr: *const u8,
         config_len: usize,
+        state_ptr: *const u8,
+        state_len: usize,
         factory: F,
     ) -> i32
     where
-        F: FnOnce(u32, C) -> T,
+        F: FnOnce(u32, C, Option<ConnectorState>) -> T,
         C: DeserializeOwned,
     {
         unsafe {
@@ -80,7 +82,15 @@ impl<T: Source + std::fmt::Debug + 'static> 
SourceContainer<T> {
                 return -1;
             };
 
-            let mut source = factory(id, config);
+            let state = if state_ptr.is_null() {
+                None
+            } else {
+                let state = std::slice::from_raw_parts(state_ptr, state_len);
+                let state = ConnectorState(state.to_vec());
+                Some(state)
+            };
+
+            let mut source = factory(id, config, state);
             let runtime = get_runtime();
             let result = runtime.block_on(source.open());
             self.id = id;
@@ -200,9 +210,22 @@ macro_rules! source_connector {
 
         #[cfg(not(test))]
         #[unsafe(no_mangle)]
-        unsafe extern "C" fn open(id: u32, config_ptr: *const u8, config_len: 
usize) -> i32 {
+        unsafe extern "C" fn open(
+            id: u32,
+            config_ptr: *const u8,
+            config_len: usize,
+            state_ptr: *const u8,
+            state_len: usize,
+        ) -> i32 {
             let mut container = SourceContainer::new(id);
-            let result = container.open(id, config_ptr, config_len, 
<$type>::new);
+            let result = container.open(
+                id,
+                config_ptr,
+                config_len,
+                state_ptr,
+                state_len,
+                <$type>::new,
+            );
             INSTANCES.insert(id, container);
             result
         }
diff --git a/core/connectors/sources/random_source/src/lib.rs 
b/core/connectors/sources/random_source/src/lib.rs
index d92f7b74..927a76c9 100644
--- a/core/connectors/sources/random_source/src/lib.rs
+++ b/core/connectors/sources/random_source/src/lib.rs
@@ -20,7 +20,7 @@ use std::{str::FromStr, time::Duration};
 
 use async_trait::async_trait;
 use iggy_connector_sdk::{
-    Error, ProducedMessage, ProducedMessages, Schema, Source, source_connector,
+    ConnectorState, Error, ProducedMessage, ProducedMessages, Schema, Source, 
source_connector,
 };
 use rand::{
     Rng,
@@ -57,17 +57,31 @@ struct State {
 }
 
 impl RandomSource {
-    pub fn new(id: u32, config: RandomSourceConfig) -> Self {
+    pub fn new(id: u32, config: RandomSourceConfig, state: 
Option<ConnectorState>) -> Self {
         let interval = config.interval.unwrap_or("1s".to_string());
         let interval = humantime::Duration::from_str(&interval)
-            .unwrap_or(humantime::Duration::from_str("1s").unwrap());
+            .unwrap_or(humantime::Duration::from_str("1s").expect("Failed to 
parse interval"));
+
+        let current_number = if let Some(state) = state {
+            u64::from_le_bytes(
+                state.0[0..8]
+                    .try_into()
+                    .inspect_err(|error| {
+                        error!("Failed to convert state to u64. {error}");
+                    })
+                    .unwrap_or_default(),
+            )
+        } else {
+            0
+        } as usize;
+
         RandomSource {
             id,
             max_count: config.max_count,
             interval: *interval,
             messages_range: config.messages_range.unwrap_or((10, 50)),
             payload_size: config.payload_size.unwrap_or(100),
-            state: Mutex::new(State { current_number: 0 }),
+            state: Mutex::new(State { current_number }),
         }
     }
 
@@ -140,6 +154,7 @@ impl Source for RandomSource {
                 return Ok(ProducedMessages {
                     schema: Schema::Json,
                     messages: vec![],
+                    state: 
Some(ConnectorState(state.current_number.to_le_bytes().to_vec())),
                 });
             }
         }
@@ -154,6 +169,7 @@ impl Source for RandomSource {
         Ok(ProducedMessages {
             schema: Schema::Json,
             messages,
+            state: 
Some(ConnectorState(state.current_number.to_le_bytes().to_vec())),
         })
     }
 

Reply via email to