This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors_docs
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 4e0e80c3d90ca193cd78c91c430f779df04f2da5
Author: spetz <[email protected]>
AuthorDate: Tue Jun 3 21:29:48 2025 +0200

    Add connectors docs, WiP
---
 Cargo.lock                                         |  33 +--
 Cargo.toml                                         |   2 +-
 DEPENDENCIES.md                                    |   2 +-
 core/connectors/README.md                          |  47 +----
 core/connectors/runtime/README.md                  |  25 +++
 core/connectors/runtime/config.toml                |  34 +++-
 core/connectors/runtime/src/main.rs                |   2 +-
 core/connectors/sdk/README.md                      |  19 ++
 core/connectors/sinks/README.md                    | 208 +++++++++++++++++++
 core/connectors/sinks/stdout_sink/Cargo.toml       |   1 +
 core/connectors/sinks/stdout_sink/src/lib.rs       |  26 ++-
 core/connectors/sources/README.md                  | 223 +++++++++++++++++++++
 .../{test_source => random_source}/Cargo.toml      |   2 +-
 .../{test_source => random_source}/src/lib.rs      |  33 +--
 14 files changed, 575 insertions(+), 82 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index b1556a42..d2b1d828 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3686,6 +3686,23 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "iggy_connector_random_source"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "dashmap",
+ "humantime",
+ "iggy_connector_sdk",
+ "once_cell",
+ "rand 0.9.1",
+ "serde",
+ "simd-json",
+ "tokio",
+ "tracing",
+ "uuid",
+]
+
 [[package]]
 name = "iggy_connector_runtime"
 version = "0.1.0"
@@ -3738,24 +3755,8 @@ dependencies = [
  "iggy_connector_sdk",
  "once_cell",
  "serde",
- "tracing",
-]
-
-[[package]]
-name = "iggy_connector_test_source"
-version = "0.1.0"
-dependencies = [
- "async-trait",
- "dashmap",
- "humantime",
- "iggy_connector_sdk",
- "once_cell",
- "rand 0.9.1",
- "serde",
- "simd-json",
  "tokio",
  "tracing",
- "uuid",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 1f2053f7..e8fa444e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,7 +34,7 @@ members = [
     "core/connectors/sdk",
     "core/connectors/sinks/quickwit_sink",
     "core/connectors/sinks/stdout_sink",
-    "core/connectors/sources/test_source",
+    "core/connectors/sources/random_source",
     "core/examples",
     "core/integration",
     "core/sdk",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index c86bd8ea..65ae29a2 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -322,10 +322,10 @@ iggy-cli: 0.9.0, "Apache-2.0",
 iggy_binary_protocol: 0.7.0, "Apache-2.0",
 iggy_common: 0.7.0, "Apache-2.0",
 iggy_connector_quickwit_sink: 0.1.0, "Apache-2.0",
+iggy_connector_random_source: 0.1.0, "Apache-2.0",
 iggy_connector_runtime: 0.1.0, "Apache-2.0",
 iggy_connector_sdk: 0.1.0, "Apache-2.0",
 iggy_connector_stdout_sink: 0.1.0, "Apache-2.0",
-iggy_connector_test_source: 0.1.0, "Apache-2.0",
 iggy_examples: 0.0.5, "Apache-2.0",
 impl-more: 0.1.9, "Apache-2.0 OR MIT",
 implicit-clone: 0.4.9, "Apache-2.0 OR MIT",
diff --git a/core/connectors/README.md b/core/connectors/README.md
index 3bbb1d1b..68051e50 100644
--- a/core/connectors/README.md
+++ b/core/connectors/README.md
@@ -34,58 +34,31 @@ iggy --username iggy --password iggy topic create qw 
records 1 none 1d
 
 6. Start the connector runtime `cargo r --bin iggy_connector_runtime -r` - you 
should be able to browse Quickwit UI with records being constantly added to the 
`events` index. At the same time, you should see the new messages being added 
to the `example` stream and `topic1` topic by the test source connector - you 
can use Iggy Web UI to browse the data. The messages will have applied the 
basic fields transformations.
 
-## Building the connectors
+## Runtime
 
-New connector can be built simply by implementing either `Sink` or `Source` 
trait. Please check the existing examples under `core/connectors/sinks` and 
`core/connectors/sources` directories.
+All the connectors are implemented as Rust libraries and can be used as a part 
of the connector runtime. The runtime is responsible for managing the lifecycle 
of the connectors and providing the necessary infrastructure for the connectors 
to run. For more information, please refer to the **[runtime 
documentation](https://github.com/apache/iggy/tree/master/core/connectors/runtime)**.
 
-### Sink
+## Sink
 
 Sinks are responsible for consuming the messages from the configured stream(s) 
and topic(s) and sending them further to the specified destination. For 
example, the Quickwit sink connector is responsible for sending the messages to 
the Quickwit indexer.
 
-```rust
-#[async_trait]
-pub trait Sink: Send + Sync {
-    /// Invoked when the sink is initialized, allowing it to perform any 
necessary setup.
-    async fn open(&mut self) -> Result<(), Error>;
-
-    /// Invoked every time a batch of messages is received from the configured 
stream(s) and topic(s).
-    async fn consume(
-        &self,
-        topic_metadata: &TopicMetadata,
-        messages_metadata: MessagesMetadata,
-        messages: Vec<ConsumedMessage>,
-    ) -> Result<(), Error>;
-
-    /// Invoked when the sink is closed, allowing it to perform any necessary 
cleanup.
-    async fn close(&mut self) -> Result<(), Error>;
-}
-```
+Please refer to the **[Sink 
documentation](https://github.com/apache/iggy/tree/master/core/connectors/sinks)**
 for the details about the configuration and the sample implementation.
 
 When implementing `Sink`, make sure to use the `sink_connector!` macro to 
expose the FFI interface and allow the connector runtime to register the sink 
with the runtime.
 Each sink should have its own, custom configuration, which is passed along 
with the unique plugin ID via expected `new()` method.
 
-### Source
+## Source
 
 Sources are responsible for producing the messages to the configured stream(s) 
and topic(s). For example, the Test source connector will generate the random 
messages that will be then sent to the configured stream and topic.
 
-```rust
-#[async_trait]
-pub trait Source: Send + Sync {
-    /// Invoked when the source is initialized, allowing it to perform any 
necessary setup.
-    async fn open(&mut self) -> Result<(), Error>;
+Please refer to the **[Source 
documentation](https://github.com/apache/iggy/tree/master/core/connectors/sources)**
 for the details about the configuration and the sample implementation.
 
-    /// Invoked every time a batch of messages is produced to the configured 
stream and topic.
-    async fn poll(&self) -> Result<ProducedMessages, Error>;
-
-    /// Invoked when the source is closed, allowing it to perform any 
necessary cleanup.
-    async fn close(&mut self) -> Result<(), Error>;
-}
-```
-
-When implementing `Source`, make sure to use the `source_connector!` macro to 
expose the FFI interface and allow the connector runtime to register the source 
with the runtime.
-Each source should have its own, custom configuration, which is passed along 
with the unique plugin ID via expected `new()` method.
+## Building the connectors
 
+New connector can be built simply by implementing either `Sink` or `Source` 
trait. Please check the 
**[sink](https://github.com/apache/iggy/tree/master/core/connectors/sinks)** or 
**[source](https://github.com/apache/iggy/tree/master/core/connectors/sources)**
 documentation, as well as the existing examples under `/sinks` and `/sources` 
directories.
 
 ## Transformations
 
 Field transformations (depending on the supported payload formats) can be 
applied to the messages either before they are sent to the specified topic 
(e.g. when produced by the source connectors), or before consumed by the sink 
connectors. To add the new transformation, simply implement the `Transform` 
trait and extend the existing `load` function. Each transform may have its own, 
custom configuration.
+
+To find out more about the transforms, stream decoders or encoders, please 
refer to the **[SDK 
documentation](https://github.com/apache/iggy/tree/master/core/connectors/sdk)**.
diff --git a/core/connectors/runtime/README.md 
b/core/connectors/runtime/README.md
new file mode 100644
index 00000000..23a35c11
--- /dev/null
+++ b/core/connectors/runtime/README.md
@@ -0,0 +1,25 @@
+# Apache Iggy Connectors - Runtime
+
+Runtime is responsible for managing the lifecycle of the connectors and 
providing the necessary infrastructure for the connectors to run.
+
+The runtime uses a shared [Tokio runtime](https://tokio.rs) to manage the 
asynchronous tasks and events across all connectors. Additionally, it has 
built-in support for logging via 
[tracing](https://docs.rs/tracing/latest/tracing/) crate.
+
+The connector are implemented as Rust libraries, and these are loaded 
dynamically during the runtime initialization process.
+
+Internally, [dlopen2](https://github.com/OpenByteDev/dlopen2) provides a safe 
and efficient way of loading the plugins via C FFI.
+
+By default, runtime will look for the configuration file, to decide which 
connectors to load and how to configure them.
+
+The minimal viable configuration requires at least the Iggy credentials, to 
create 2 separate instances of producer & consumer connections.
+
+```toml
+[iggy]
+address = "localhost:8090"
+username = "iggy"
+password = "iggy"
+# token = "secret" # Personal Access Token (PAT) can be used instead of 
username and password
+```
+
+All the other config sections start either with `sources` or `sinks` depending 
on the connector type.
+
+Keep in mind that either of `toml`, `yaml`, or `json` formats are supported 
for the configuration file. The path to the configuration can be overriden by 
`IGGY_CONNECTORS_RUNTIME_CONFIG_PATH` environment variable. Each configuration 
section can be also additionally updated by using the following convention 
`IGGY_CONNECTORS_SECTION_NAME.KEY_NAME` e.g. `IGGY_CONNECTORS_IGGY_USERNAME` 
and so on.
diff --git a/core/connectors/runtime/config.toml 
b/core/connectors/runtime/config.toml
index 67b7d5bc..d7361472 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -21,28 +21,48 @@ username = "iggy"
 password = "iggy"
 # token = "secret" # Personal Access Token (PAT) can be used instead of 
username and password
 
-[sources.test1]
+[sinks.stdout]
 enabled = true
-name = "Test source"
-path = "target/release/libiggy_connector_test_source"
+name = "Stdout sink"
+path = "target/release/libiggy_connector_stdout_sink"
 
-[[sources.test1.streams]]
+[[sinks.stdout.streams]]
+stream = "example_stream"
+topics = ["example_topic"]
+schema = "json"
+batch_size = 100
+poll_interval = "5ms"
+consumer_group = "qw_sink_connector"
+
+[sinks.stdout.transforms.add_fields]
+enabled = true
+
+[[sinks.stdout.transforms.add_fields.fields]]
+key = "message"
+value.static = "hello"
+
+[sources.random]
+enabled = true
+name = "Random source"
+path = "target/release/libiggy_connector_random_source"
+
+[[sources.random.streams]]
 stream = "example"
 topic = "topic1"
 schema = "json"
 batch_size = 1000
 send_interval = "5ms"
 
-[sources.test1.config]
+[sources.random.config]
 interval = "100ms"
 # max_count = 1000
 messages_range = [10, 50]
 payload_size = 200
 
-[sources.test1.transforms.add_fields]
+[sources.random.transforms.add_fields]
 enabled = true
 
-[[sources.test1.transforms.add_fields.fields]]
+[[sources.random.transforms.add_fields.fields]]
 key = "test_field"
 value.static = "hello!"
 
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
index cbffc619..f87057bf 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -83,7 +83,7 @@ async fn main() -> Result<(), RuntimeError> {
     info!("Starting Iggy Connector Runtime, loading configuration from: 
{config_path}...");
     let builder = Config::builder()
         .add_source(File::with_name(&config_path))
-        .add_source(Environment::with_prefix("IGGY").separator("_"));
+        
.add_source(Environment::with_prefix("IGGY_CONNECTORS").separator("_"));
 
     let config: RuntimeConfig = builder
         .build()
diff --git a/core/connectors/sdk/README.md b/core/connectors/sdk/README.md
new file mode 100644
index 00000000..13ee85fa
--- /dev/null
+++ b/core/connectors/sdk/README.md
@@ -0,0 +1,19 @@
+# Apache Iggy Connectors - SDK
+
+SDK provides the commonly used structs and traits such as `Sink` and `Source`, 
along with the `sink_connector` and `source_connector` macros to be used when 
developing connectors.
+
+Moreover, it contains both, the `decoders` and `encoders` modules, 
implementing either `StreamDecoder` or `StreamEncoder` traits, which are used 
when consuming or producing data from/to Iggy streams.
+
+SDK is WiP, and it'd certainly benefit from having the support of multiple 
format schemas, such as Protobuf, Avro, Flatbuffers etc. including 
decoding/encoding the data between the different formats (when applicable) and 
supporting the data transformations whenever possible (easy for JSON, but 
complex for Bincode for example).
+
+
+Last but not least, the different `transforms` are available, to transform 
(add, update, delete etc.) the particular fields of the data being processed 
via external configuration. It's as simple as adding a new transform to the 
`transforms` section of the particular connector configuration:
+
+```toml
+[sources.random.transforms.add_fields]
+enabled = true
+
+[[sources.random.transforms.add_fields.fields]]
+key = "message"
+value.static = "hello"
+```
diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md
new file mode 100644
index 00000000..248da159
--- /dev/null
+++ b/core/connectors/sinks/README.md
@@ -0,0 +1,208 @@
+# Apache Iggy Connectors - Sink
+
+## Overview
+
+Sink connectors are responsible for writing data from Iggy streams to external 
systems or destinations. They provide a way to integrate Apache Iggy with 
various data sources and destinations, enabling seamless data flow and 
processing.
+
+The sink is represented by the single `Sink` trait, which defines the basic 
interface for all source connectors. It provides methods for initializing the 
sink, writing data to external destonation, and closing the sink.
+
+```rust
+#[async_trait]
+pub trait Source: Send + Sync {
+    /// Invoked when the source is initialized, allowing it to perform any 
necessary setup.
+    async fn open(&mut self) -> Result<(), Error>;
+
+    /// Invoked every time a batch of messages is produced to the configured 
stream and topic.
+    async fn poll(&self) -> Result<ProducedMessages, Error>;
+
+    /// Invoked when the source is closed, allowing it to perform any 
necessary cleanup.
+    async fn close(&mut self) -> Result<(), Error>;
+}
+```
+
+
+## Configuration
+
+Sink is configured in the default `config` file used by runtime. Each sink 
configuration, is part of the map of <String, SinkConfig>, which can be 
represented using toml, json, or yaml.
+
+```rust
+pub struct SinkConfig {
+    pub enabled: bool,
+    pub name: String,
+    pub path: String,
+    pub transforms: Option<TransformsConfig>,
+    pub streams: Vec<StreamConsumerConfig>,
+    pub config: Option<serde_json::Value>,
+}
+```
+
+Below is the example configuration for a sink connector, using `stdout` as 
it's unique identifier:
+
+```toml
+# Required configuration for a sink connector
+[sinks.stdout]
+enabled = true
+name = "Stdout sink"
+path = "target/release/libiggy_connector_stdout_sink"
+
+# Collection of the streams from which messages are consumed
+[[sinks.stdout.streams]]
+stream = "example_stream"
+topics = ["example_topic"]
+schema = "json"
+batch_size = 100
+poll_interval = "5ms"
+consumer_group = "stdout_sink_connector"
+
+# Custom configuration for the sink connector, deserialized to type T from 
`config` field
+[sinks.stdout.config]
+print_payload = true
+
+# Optional data transformation(s) to be applied after consuming messages from 
the stream
+[sinks.stdout.transforms.add_fields]
+enabled = true
+
+# Collection of the fields transforms to be applied after consuming messages 
from the stream
+[[sinks.stdout.transforms.add_fields.fields]]
+key = "message"
+value.static = "hello"
+```
+
+## Sample implementation
+
+Let's implement the example sink connector, which will simply print the 
messages to the standard output.
+
+Additionally, our sink connector will have its own state, which can be used 
e.g. to track the overall progress or store some relevant information when 
ingesting the data further to the external sources or tooling.
+
+Also, when implementing the sink connector, make sure to use the 
`sink_connector!` macro to expose the FFI interface and allow the connector 
runtime to register the sink with the runtime.
+
+And finally, each sink should have its own, custom configuration, which is 
passed along with the unique plugin ID via expected `new()` method.
+
+Let's start by defining the internal state and the public sink connector along 
with its own configuration.
+
+```rust
+#[derive(Debug)]
+struct State {
+    invocations_count: usize,
+}
+```
+
+```rust
+#[derive(Debug)]
+pub struct StdoutSink {
+    id: u32,
+    print_payload: bool,
+    state: Mutex<State>
+}
+```
+
+```rust
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StdoutSinkConfig {
+    print_payload: Option<bool>,
+}
+```
+
+```rust
+impl StdoutSink {
+    pub fn new(id: u32, config: StdoutSinkConfig) -> Self {
+        StdoutSink {
+            id,
+            print_payload: config.print_payload.unwrap_or(false),
+            state: Mutex::new(State { invocations_count: 0 }),
+        }
+    }
+}
+```
+
+And we can invoke the expected macro to expose the FFI interface and allow the 
connector runtime to register the sink within the runtime.
+
+```rust
+sink_connector!(StdoutSink);
+```
+
+At a bare minimum, we need to add the following dependencies to the 
`Cargo.toml` file to compile the plugin at all:
+
+- dashmap
+- once_cell
+- tracing
+
+Now, let's implement the `Sink` trait for our `StdoutSink` struct.
+
+```rust
+#[async_trait]
+impl Sink for StdoutSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Opened stdout sink connector with ID: {}, print payload: {}",
+            self.id, self.print_payload
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        let mut state = self.state.lock().await;
+        state.invocations_count += 1;
+        let invocation = state.invocations_count;
+        drop(state);
+
+        info!(
+            "Stdout sink with ID: {} received: {} messages, schema: {}, 
stream: {}, topic: {}, partition: {}, offset: {}, invocation: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.schema,
+            topic_metadata.stream,
+            topic_metadata.topic,
+            messages_metadata.partition_id,
+            messages_metadata.current_offset,
+            invocation
+        );
+        if self.print_payload {
+            for message in messages {
+                info!(
+                    "Message offset: {}, payload: {:#?}",
+                    message.offset, message.payload
+                );
+            }
+        }
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<(), Error> {
+        info!("Stdout sink connector with ID: {} is closed.", self.id);
+        Ok(())
+    }
+}
+```
+
+It's also important to note, that the supported format(s) might vary depending 
on the connector implementation. For example, you might expect `JSON` as the 
payload format, which can be then easily parsed and processed by upstream 
components such as data transforms, but at the same time, you could support the 
other formats and let the user decide which one to use.
+
+For example, you can match against the `payload` enum field containing the 
deserialized value to process (or not) the consumed message(s).
+
+```rust
+for message in messages {
+    match message.payload {
+        Payload::Json(value) =>  {
+            // Process JSON payload
+        }
+        _ => {
+            warn!("Unsupported payload format: {}", messages_metadata.schema);
+        }
+    }
+}
+```
+
+While the schema of messages (that will be consumed from the Iggy stream), 
cannot be controlled by the sink connector itself, the built-in configuration 
allows to decide what's the expected format of the messages (the particular 
`StreamDecoder` will be used).
+
+Keep in mind, that it might be sometimes difficult/impossible e.g. to 
transform one format to another e.g. JSON to SBE or so, and in such a case, the 
consumed messages will be ignored.
+
+Eventually, compile the source code and update the runtime configuration file 
using the example config above (`config.toml` file by default, unless you 
prefer `yaml` or `json` format instead - just make sure that `path` points to 
the existing plugin).
+
+And that's all, enjoy using the sink connector!
+
+On a side note, if you'd like to produce the messages to the Iggy stream 
instead, you can implement your own **[Source 
connector](https://github.com/apache/iggy/tree/master/core/connectors/sources)**
 too :)
diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml 
b/core/connectors/sinks/stdout_sink/Cargo.toml
index da705597..cf6a8f4d 100644
--- a/core/connectors/sinks/stdout_sink/Cargo.toml
+++ b/core/connectors/sinks/stdout_sink/Cargo.toml
@@ -37,6 +37,7 @@ dashmap = { workspace = true }
 iggy_connector_sdk = { workspace = true }
 once_cell = { workspace = true }
 serde = { workspace = true }
+tokio = { workspace = true }
 tracing = { workspace = true }
 
 [package.metadata.cargo-machete]
diff --git a/core/connectors/sinks/stdout_sink/src/lib.rs 
b/core/connectors/sinks/stdout_sink/src/lib.rs
index b71267da..4ffc4234 100644
--- a/core/connectors/sinks/stdout_sink/src/lib.rs
+++ b/core/connectors/sinks/stdout_sink/src/lib.rs
@@ -21,14 +21,21 @@ use iggy_connector_sdk::{
     ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata, 
sink_connector,
 };
 use serde::{Deserialize, Serialize};
+use tokio::sync::Mutex;
 use tracing::info;
 
 sink_connector!(StdoutSink);
 
+#[derive(Debug)]
+struct State {
+    invocations_count: usize,
+}
+
 #[derive(Debug)]
 pub struct StdoutSink {
     id: u32,
     print_payload: bool,
+    state: Mutex<State>,
 }
 
 #[derive(Debug, Serialize, Deserialize)]
@@ -41,6 +48,9 @@ impl StdoutSink {
         StdoutSink {
             id,
             print_payload: config.print_payload.unwrap_or(false),
+            state: Mutex::new(State {
+                invocations_count: 0,
+            }),
         }
     }
 }
@@ -49,7 +59,7 @@ impl StdoutSink {
 impl Sink for StdoutSink {
     async fn open(&mut self) -> Result<(), Error> {
         info!(
-            "Initialized stdout sink with ID: {}. Print payload: {}",
+            "Opened stdout sink connector with ID: {}, print payload: {}",
             self.id, self.print_payload
         );
         Ok(())
@@ -61,15 +71,21 @@ impl Sink for StdoutSink {
         messages_metadata: MessagesMetadata,
         messages: Vec<ConsumedMessage>,
     ) -> Result<(), Error> {
+        let mut state = self.state.lock().await;
+        state.invocations_count += 1;
+        let invocation = state.invocations_count;
+        drop(state);
+
         info!(
-            "Stdout sink with ID: {} received: {} messages, schema: {}, 
stream: {}, topic: {}, partition: {}, offset: {}",
+            "Stdout sink with ID: {} received: {} messages, schema: {}, 
stream: {}, topic: {}, partition: {}, offset: {}, invocation: {}",
             self.id,
             messages.len(),
             messages_metadata.schema,
             topic_metadata.stream,
-            topic_metadata.stream,
+            topic_metadata.topic,
             messages_metadata.partition_id,
-            messages_metadata.current_offset
+            messages_metadata.current_offset,
+            invocation
         );
         if self.print_payload {
             for message in messages {
@@ -83,7 +99,7 @@ impl Sink for StdoutSink {
     }
 
     async fn close(&mut self) -> Result<(), Error> {
-        info!("Stdout sink with ID: {} is shutting down", self.id);
+        info!("Stdout sink connector with ID: {} is closed.", self.id);
         Ok(())
     }
 }
diff --git a/core/connectors/sources/README.md 
b/core/connectors/sources/README.md
new file mode 100644
index 00000000..c152e9f8
--- /dev/null
+++ b/core/connectors/sources/README.md
@@ -0,0 +1,223 @@
+# Apache Iggy Connectors - Source
+
+## Overview
+
+Source connectors are responsible for ingesting data from external sources 
into Apache Iggy. They provide a way to integrate Apache Iggy with various data 
sources, such as databases, message queues, or file systems.
+
+The source is represented by the single `Source` trait, which defines the 
basic interface for all source connectors. It provides methods for initializing 
the source, reading data from it, and closing the source.
+
+```rust
+#[async_trait]
+pub trait Source: Send + Sync {
+    /// Invoked when the source is initialized, allowing it to perform any 
necessary setup.
+    async fn open(&mut self) -> Result<(), Error>;
+
+    /// Invoked every time a batch of messages is produced to the configured 
stream and topic.
+    async fn poll(&self) -> Result<ProducedMessages, Error>;
+
+    /// Invoked when the source is closed, allowing it to perform any 
necessary cleanup.
+    async fn close(&mut self) -> Result<(), Error>;
+}
+```
+
+## Configuration
+
+Source is configured in the default `config` file used by runtime. Each source 
configuration, is part of the map of <String, SourceConfig>, which can be 
represented using toml, json, or yaml.
+
+```rust
+pub struct SourceConfig {
+    pub enabled: bool,
+    pub name: String,
+    pub path: String,
+    pub transforms: Option<TransformsConfig>,
+    pub streams: Vec<StreamProducerConfig>,
+    pub config: Option<serde_json::Value>,
+}
+```
+
+Below is the example configuration for a source connector, using `random` as 
it's unique identifier:
+
+```toml
+# Required configuration for a source connector
+[sources.random]
+enabled = true # Toggle source on/off
+name = "Random source" # Name of the source
+path = "libiggy_connector_random_source" # Path to the source connector
+
+# Collection of the streams to which the produced messages are sent
+[[sources.random.streams]]
+stream = "example_stream"
+topic = "example_topic"
+schema = "json"
+batch_size = 100
+send_interval = "5ms"
+
+# Custom configuration for the source connector, deserialized to type T from 
`config` field
+[sources.random.config]
+messages_count = 10
+
+# Optional data transformation(s) to be applied before sending messages to the 
stream
+[sources.random.transforms.add_fields]
+enabled = true
+
+# Collection of the fields transforms to be applied before sending messages to 
the stream
+[[sources.random.transforms.add_fields.fields]]
+key = "message"
+value.static = "hello"
+```
+
+## Sample implementation
+
+Let's implement the example source connector, which will simply generate the N 
random messages depending on the count specified in the configuration.
+
+Additionally, our source connector will have its own state, which can be used 
e.g. to track the overall progress or store some relevant information when 
producing the data from the actual external sources or tooling.
+
+Keep in mind, that the produces messages will be sent further to the specified 
stream, but it's already the responsibility of the runtime to handle the 
delivery.
+
+Also, when implementing the source connector, make sure to use the 
`source_connector!` macro to expose the FFI interface and allow the connector 
runtime to register the source with the runtime.
+
+And finally, each source should have its own, custom configuration, which is 
passed along with the unique plugin ID via expected `new()` method.
+
+Let's start by defining the internal state and the public source connector 
along with its own configuration.
+
+```rust
+#[derive(Debug)]
+struct State {
+    current_id: usize,
+}
+```
+
+```rust
+#[derive(Debug)]
+pub struct RandomSource {
+    id: u32,
+    messages_count: u32,
+    state: Mutex<State>
+}
+```
+
+```rust
+#[derive(Debug, Serialize, Deserialize)]
+pub struct RandomSourceConfig {
+    messages_count: Option<u32>,
+}
+```
+
+At this point, we can expose the expected `new()` method, which will be used 
by the runtime to create a new instance of the source connector. The `id` is 
assigned by the runtime, and represents the unique identifier of the source 
connector.
+
+```rust
+impl RandomSource {
+    pub fn new(id: u32, config: RandomSourceConfig) -> Self {
+        RandomSource {
+            id,
+            payload_size: config.payload_size.unwrap_or(100),
+            state: Mutex::new(State { current_id: 0 }),
+        }
+    }
+}
+```
+
+And we can invoke the expected macro to expose the FFI interface and allow the 
connector runtime to register the source within the runtime.
+
+```rust
+source_connector!(TestSource);
+```
+
+At a bare minimum, we need to add the following dependencies to the 
`Cargo.toml` file to compile the plugin at all:
+
+- dashmap
+- once_cell
+- tracing
+
+Before we make use of the `Source` trait, let's define the internal payload of 
the message that will be produced (e.g. as if it was pulled from some external 
database or so).
+
+```rust
+#[derive(Debug, Serialize, Deserialize)]
+struct Record {
+    id: u64,
+    text: String,
+}
+```
+
+Now, let's implement the `Source` trait for our `RandomSource` struct. We'll 
assume that the amount of messages (provided in the config), will be generated 
every 100ms to mimic the behavior of a real-world external source.
+
+```rust
+#[async_trait]
+impl Source for RandomSource {
+    async fn open(&mut self) -> Result<(), iggy_connector_sdk::Error> {
+        info!(
+            "Opened random source connector with ID: {}, messages count: {}",
+            self.id, self.messages_count
+        );
+        Ok(())
+    }
+
+    async fn poll(&self) -> Result<ProducedMessages, 
iggy_connector_sdk::Error> {
+        sleep(Duration::from_millis(100)).await;
+        let mut state = self.state.lock().await;
+        let current_id = state.current_id;
+
+        let mut messages = Vec::new();
+        for _ in 0..self.messages_count {
+            current_id += 1;
+            let record = Record {
+                id: current_id,
+                text: format!("Hello from random source connector: 
#{current_id}")
+            };
+            let Ok(payload) = simd_json::to_vec(&record) else {
+                error!(
+                    "Failed to serialize record by random source connector 
with ID: {}",
+                    self.id
+                );
+                continue;
+            };
+
+            let message = ProducedMessage {
+                id: None,
+                headers: None,
+                checksum: None,
+                timestamp: None,
+                origin_timestamp: None,
+                payload,
+            };
+            messages.push(message);
+        }
+
+        state.current_id += current_id;
+        info!(
+            "Generated {} messages by random source connector with ID: {}"
+            messages.len(),
+            self.id,
+        );
+        Ok(ProducedMessages {
+            schema: Schema::Json,
+            messages,
+        })
+    }
+
+    async fn close(&mut self) -> Result<(), Error> {
+        info!("Random source connector with ID: {} is closed.", self.id);
+        Ok(())
+    }
+}
+```
+
+As you can see, the `ProducedMessage` can be customized to fit your needs, as 
all the fields will be directly mapped to the existing Iggy message struct.
+
+It's also important to note, that the supported format(s) might vary depending 
on the connector implementation. For example, you might use `JSON` as the 
payload format, which can be then easily parsed and processed by downstream 
components such as data transforms, but at the same time, you could support the 
other formats and let the user decide which one to use.
+
+While the final schema of messages (that will be appended to the Iggy stream), 
can be controlled with the the built-in configuration (the particular 
`StreamEncoder` will be used), keep in mind, that it might be sometimes 
difficult/impossible e.g. to transform one format to another e.g. JSON to SBE 
or so, and in such a case, the produced messages will be ignored.
+
+Eventually, compile the source code and update the runtime configuration file 
using the example config above (`config.toml` file by default, unless you 
prefer `yaml` or `json` format instead - just make sure that `path` points to 
the existing plugin).
+
+And before starting the runtime, do not forget to create the specified stream 
and topic e.g. via Iggy CLI.
+
+```
+iggy --username iggy --password iggy stream create example_stream
+
+iggy --username iggy --password iggy topic create example_stream example_topic 
1 none 1d
+```
+
+And that's all, enjoy using the source connector!
+
+On a side note, if you'd like to process the messages consumed from the Iggy 
stream instead, you can implement your own **[Sink 
connector](https://github.com/apache/iggy/tree/master/core/connectors/sinks)** 
too :)
diff --git a/core/connectors/sources/test_source/Cargo.toml 
b/core/connectors/sources/random_source/Cargo.toml
similarity index 97%
rename from core/connectors/sources/test_source/Cargo.toml
rename to core/connectors/sources/random_source/Cargo.toml
index 4c8bd949..efd6c389 100644
--- a/core/connectors/sources/test_source/Cargo.toml
+++ b/core/connectors/sources/random_source/Cargo.toml
@@ -16,7 +16,7 @@
 # under the License.
 
 [package]
-name = "iggy_connector_test_source"
+name = "iggy_connector_random_source"
 version = "0.1.0"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
diff --git a/core/connectors/sources/test_source/src/lib.rs 
b/core/connectors/sources/random_source/src/lib.rs
similarity index 84%
rename from core/connectors/sources/test_source/src/lib.rs
rename to core/connectors/sources/random_source/src/lib.rs
index 69f3ff60..489a2f26 100644
--- a/core/connectors/sources/test_source/src/lib.rs
+++ b/core/connectors/sources/random_source/src/lib.rs
@@ -28,13 +28,13 @@ use rand::{
 };
 use serde::{Deserialize, Serialize};
 use tokio::{sync::Mutex, time::sleep};
-use tracing::info;
+use tracing::{error, info};
 use uuid::Uuid;
 
-source_connector!(TestSource);
+source_connector!(RandomSource);
 
 #[derive(Debug)]
-pub struct TestSource {
+pub struct RandomSource {
     id: u32,
     max_count: Option<usize>,
     interval: Duration,
@@ -44,7 +44,7 @@ pub struct TestSource {
 }
 
 #[derive(Debug, Serialize, Deserialize)]
-pub struct TestSourceConfig {
+pub struct RandomSourceConfig {
     interval: Option<String>,
     max_count: Option<usize>,
     messages_range: Option<(u32, u32)>,
@@ -56,12 +56,12 @@ struct State {
     current_number: usize,
 }
 
-impl TestSource {
-    pub fn new(id: u32, config: TestSourceConfig) -> Self {
+impl RandomSource {
+    pub fn new(id: u32, config: RandomSourceConfig) -> Self {
         let interval = config.interval.unwrap_or("1s".to_string());
         let interval = humantime::Duration::from_str(&interval)
             .unwrap_or(humantime::Duration::from_str("1s").unwrap());
-        TestSource {
+        RandomSource {
             id,
             max_count: config.max_count,
             interval: *interval,
@@ -84,7 +84,11 @@ impl TestSource {
                 text: self.generate_random_text(),
             };
             let Ok(payload) = simd_json::to_vec(&record) else {
-                panic!("Failed to serialize record");
+                error!(
+                    "Failed to serialize record by random source connector 
with ID: {}",
+                    self.id
+                );
+                continue;
             };
 
             let message = ProducedMessage {
@@ -110,10 +114,10 @@ impl TestSource {
 }
 
 #[async_trait]
-impl Source for TestSource {
+impl Source for RandomSource {
     async fn open(&mut self) -> Result<(), iggy_connector_sdk::Error> {
         info!(
-            "Initialized test source with ID {}. Interval: {:#?}, max offset: 
{:#?}, messages range: {} - {}, payload size: {}",
+            "Initialized random source connector with ID: {}. Interval: {:#?}, 
max offset: {:#?}, messages range: {} - {}, payload size: {}",
             self.id,
             self.interval,
             self.max_count,
@@ -130,7 +134,7 @@ impl Source for TestSource {
         if let Some(max_count) = self.max_count {
             if state.current_number >= max_count {
                 info!(
-                    "Reached max number of {max_count} messages for test 
source with ID {}",
+                    "Reached max number of {max_count} messages for random 
source connector with ID: {}",
                     self.id
                 );
                 return Ok(ProducedMessages {
@@ -143,7 +147,7 @@ impl Source for TestSource {
         let messages = self.generate_messages();
         state.current_number += messages.len();
         info!(
-            "Generated {} messages by test source with ID {}",
+            "Generated {} messages by random source connector with ID: {}",
             messages.len(),
             self.id
         );
@@ -154,7 +158,10 @@ impl Source for TestSource {
     }
 
     async fn close(&mut self) -> Result<(), Error> {
-        info!("Test source with ID {} is shutting down", self.id);
+        info!(
+            "Random source connector with ID: {} is shutting down",
+            self.id
+        );
         Ok(())
     }
 }


Reply via email to