This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new ae1470cd docs(connectors): add documentation to connectors and example
plugins (#1836)
ae1470cd is described below
commit ae1470cdfd082967df9a233959f9219479fbffba
Author: Piotr Gankiewicz <[email protected]>
AuthorDate: Thu Jun 5 22:05:05 2025 +0200
docs(connectors): add documentation to connectors and example plugins
(#1836)
---
Cargo.lock | 33 +--
Cargo.toml | 2 +-
DEPENDENCIES.md | 2 +-
core/connectors/README.md | 47 +----
core/connectors/runtime/README.md | 25 +++
core/connectors/runtime/config.toml | 34 +++-
core/connectors/runtime/src/main.rs | 2 +-
core/connectors/sdk/README.md | 19 ++
core/connectors/sinks/README.md | 213 ++++++++++++++++++++
core/connectors/sinks/stdout_sink/Cargo.toml | 1 +
core/connectors/sinks/stdout_sink/src/lib.rs | 26 ++-
core/connectors/sources/README.md | 223 +++++++++++++++++++++
.../{test_source => random_source}/Cargo.toml | 2 +-
.../{test_source => random_source}/src/lib.rs | 33 +--
14 files changed, 580 insertions(+), 82 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index b1556a42..d2b1d828 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3686,6 +3686,23 @@ dependencies = [
"tracing",
]
+[[package]]
+name = "iggy_connector_random_source"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "dashmap",
+ "humantime",
+ "iggy_connector_sdk",
+ "once_cell",
+ "rand 0.9.1",
+ "serde",
+ "simd-json",
+ "tokio",
+ "tracing",
+ "uuid",
+]
+
[[package]]
name = "iggy_connector_runtime"
version = "0.1.0"
@@ -3738,24 +3755,8 @@ dependencies = [
"iggy_connector_sdk",
"once_cell",
"serde",
- "tracing",
-]
-
-[[package]]
-name = "iggy_connector_test_source"
-version = "0.1.0"
-dependencies = [
- "async-trait",
- "dashmap",
- "humantime",
- "iggy_connector_sdk",
- "once_cell",
- "rand 0.9.1",
- "serde",
- "simd-json",
"tokio",
"tracing",
- "uuid",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 1f2053f7..e8fa444e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,7 +34,7 @@ members = [
"core/connectors/sdk",
"core/connectors/sinks/quickwit_sink",
"core/connectors/sinks/stdout_sink",
- "core/connectors/sources/test_source",
+ "core/connectors/sources/random_source",
"core/examples",
"core/integration",
"core/sdk",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index c86bd8ea..65ae29a2 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -322,10 +322,10 @@ iggy-cli: 0.9.0, "Apache-2.0",
iggy_binary_protocol: 0.7.0, "Apache-2.0",
iggy_common: 0.7.0, "Apache-2.0",
iggy_connector_quickwit_sink: 0.1.0, "Apache-2.0",
+iggy_connector_random_source: 0.1.0, "Apache-2.0",
iggy_connector_runtime: 0.1.0, "Apache-2.0",
iggy_connector_sdk: 0.1.0, "Apache-2.0",
iggy_connector_stdout_sink: 0.1.0, "Apache-2.0",
-iggy_connector_test_source: 0.1.0, "Apache-2.0",
iggy_examples: 0.0.5, "Apache-2.0",
impl-more: 0.1.9, "Apache-2.0 OR MIT",
implicit-clone: 0.4.9, "Apache-2.0 OR MIT",
diff --git a/core/connectors/README.md b/core/connectors/README.md
index 3bbb1d1b..68051e50 100644
--- a/core/connectors/README.md
+++ b/core/connectors/README.md
@@ -34,58 +34,31 @@ iggy --username iggy --password iggy topic create qw
records 1 none 1d
6. Start the connector runtime `cargo r --bin iggy_connector_runtime -r` - you
should be able to browse Quickwit UI with records being constantly added to the
`events` index. At the same time, you should see the new messages being added
to the `example` stream and `topic1` topic by the test source connector - you
can use Iggy Web UI to browse the data. The messages will have applied the
basic fields transformations.
-## Building the connectors
+## Runtime
-New connector can be built simply by implementing either `Sink` or `Source`
trait. Please check the existing examples under `core/connectors/sinks` and
`core/connectors/sources` directories.
+All the connectors are implemented as Rust libraries and can be used as a part
of the connector runtime. The runtime is responsible for managing the lifecycle
of the connectors and providing the necessary infrastructure for the connectors
to run. For more information, please refer to the **[runtime
documentation](https://github.com/apache/iggy/tree/master/core/connectors/runtime)**.
-### Sink
+## Sink
Sinks are responsible for consuming the messages from the configured stream(s)
and topic(s) and sending them further to the specified destination. For
example, the Quickwit sink connector is responsible for sending the messages to
the Quickwit indexer.
-```rust
-#[async_trait]
-pub trait Sink: Send + Sync {
- /// Invoked when the sink is initialized, allowing it to perform any
necessary setup.
- async fn open(&mut self) -> Result<(), Error>;
-
- /// Invoked every time a batch of messages is received from the configured
stream(s) and topic(s).
- async fn consume(
- &self,
- topic_metadata: &TopicMetadata,
- messages_metadata: MessagesMetadata,
- messages: Vec<ConsumedMessage>,
- ) -> Result<(), Error>;
-
- /// Invoked when the sink is closed, allowing it to perform any necessary
cleanup.
- async fn close(&mut self) -> Result<(), Error>;
-}
-```
+Please refer to the **[Sink
documentation](https://github.com/apache/iggy/tree/master/core/connectors/sinks)**
for the details about the configuration and the sample implementation.
When implementing `Sink`, make sure to use the `sink_connector!` macro to
expose the FFI interface and allow the connector runtime to register the sink
with the runtime.
Each sink should have its own, custom configuration, which is passed along
with the unique plugin ID via expected `new()` method.
-### Source
+## Source
Sources are responsible for producing the messages to the configured stream(s)
and topic(s). For example, the Test source connector will generate the random
messages that will be then sent to the configured stream and topic.
-```rust
-#[async_trait]
-pub trait Source: Send + Sync {
- /// Invoked when the source is initialized, allowing it to perform any
necessary setup.
- async fn open(&mut self) -> Result<(), Error>;
+Please refer to the **[Source
documentation](https://github.com/apache/iggy/tree/master/core/connectors/sources)**
for the details about the configuration and the sample implementation.
- /// Invoked every time a batch of messages is produced to the configured
stream and topic.
- async fn poll(&self) -> Result<ProducedMessages, Error>;
-
- /// Invoked when the source is closed, allowing it to perform any
necessary cleanup.
- async fn close(&mut self) -> Result<(), Error>;
-}
-```
-
-When implementing `Source`, make sure to use the `source_connector!` macro to
expose the FFI interface and allow the connector runtime to register the source
with the runtime.
-Each source should have its own, custom configuration, which is passed along
with the unique plugin ID via expected `new()` method.
+## Building the connectors
+New connector can be built simply by implementing either `Sink` or `Source`
trait. Please check the
**[sink](https://github.com/apache/iggy/tree/master/core/connectors/sinks)** or
**[source](https://github.com/apache/iggy/tree/master/core/connectors/sources)**
documentation, as well as the existing examples under `/sinks` and `/sources`
directories.
## Transformations
Field transformations (depending on the supported payload formats) can be
applied to the messages either before they are sent to the specified topic
(e.g. when produced by the source connectors), or before consumed by the sink
connectors. To add the new transformation, simply implement the `Transform`
trait and extend the existing `load` function. Each transform may have its own,
custom configuration.
+
+To find out more about the transforms, stream decoders or encoders, please
refer to the **[SDK
documentation](https://github.com/apache/iggy/tree/master/core/connectors/sdk)**.
diff --git a/core/connectors/runtime/README.md
b/core/connectors/runtime/README.md
new file mode 100644
index 00000000..23a35c11
--- /dev/null
+++ b/core/connectors/runtime/README.md
@@ -0,0 +1,25 @@
+# Apache Iggy Connectors - Runtime
+
+Runtime is responsible for managing the lifecycle of the connectors and
providing the necessary infrastructure for the connectors to run.
+
+The runtime uses a shared [Tokio runtime](https://tokio.rs) to manage the
asynchronous tasks and events across all connectors. Additionally, it has
built-in support for logging via
[tracing](https://docs.rs/tracing/latest/tracing/) crate.
+
+The connector are implemented as Rust libraries, and these are loaded
dynamically during the runtime initialization process.
+
+Internally, [dlopen2](https://github.com/OpenByteDev/dlopen2) provides a safe
and efficient way of loading the plugins via C FFI.
+
+By default, runtime will look for the configuration file, to decide which
connectors to load and how to configure them.
+
+The minimal viable configuration requires at least the Iggy credentials, to
create 2 separate instances of producer & consumer connections.
+
+```toml
+[iggy]
+address = "localhost:8090"
+username = "iggy"
+password = "iggy"
+# token = "secret" # Personal Access Token (PAT) can be used instead of
username and password
+```
+
+All the other config sections start either with `sources` or `sinks` depending
on the connector type.
+
+Keep in mind that either of `toml`, `yaml`, or `json` formats are supported
for the configuration file. The path to the configuration can be overriden by
`IGGY_CONNECTORS_RUNTIME_CONFIG_PATH` environment variable. Each configuration
section can be also additionally updated by using the following convention
`IGGY_CONNECTORS_SECTION_NAME.KEY_NAME` e.g. `IGGY_CONNECTORS_IGGY_USERNAME`
and so on.
diff --git a/core/connectors/runtime/config.toml
b/core/connectors/runtime/config.toml
index 67b7d5bc..d7361472 100644
--- a/core/connectors/runtime/config.toml
+++ b/core/connectors/runtime/config.toml
@@ -21,28 +21,48 @@ username = "iggy"
password = "iggy"
# token = "secret" # Personal Access Token (PAT) can be used instead of
username and password
-[sources.test1]
+[sinks.stdout]
enabled = true
-name = "Test source"
-path = "target/release/libiggy_connector_test_source"
+name = "Stdout sink"
+path = "target/release/libiggy_connector_stdout_sink"
-[[sources.test1.streams]]
+[[sinks.stdout.streams]]
+stream = "example_stream"
+topics = ["example_topic"]
+schema = "json"
+batch_size = 100
+poll_interval = "5ms"
+consumer_group = "qw_sink_connector"
+
+[sinks.stdout.transforms.add_fields]
+enabled = true
+
+[[sinks.stdout.transforms.add_fields.fields]]
+key = "message"
+value.static = "hello"
+
+[sources.random]
+enabled = true
+name = "Random source"
+path = "target/release/libiggy_connector_random_source"
+
+[[sources.random.streams]]
stream = "example"
topic = "topic1"
schema = "json"
batch_size = 1000
send_interval = "5ms"
-[sources.test1.config]
+[sources.random.config]
interval = "100ms"
# max_count = 1000
messages_range = [10, 50]
payload_size = 200
-[sources.test1.transforms.add_fields]
+[sources.random.transforms.add_fields]
enabled = true
-[[sources.test1.transforms.add_fields.fields]]
+[[sources.random.transforms.add_fields.fields]]
key = "test_field"
value.static = "hello!"
diff --git a/core/connectors/runtime/src/main.rs
b/core/connectors/runtime/src/main.rs
index cbffc619..f87057bf 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -83,7 +83,7 @@ async fn main() -> Result<(), RuntimeError> {
info!("Starting Iggy Connector Runtime, loading configuration from:
{config_path}...");
let builder = Config::builder()
.add_source(File::with_name(&config_path))
- .add_source(Environment::with_prefix("IGGY").separator("_"));
+
.add_source(Environment::with_prefix("IGGY_CONNECTORS").separator("_"));
let config: RuntimeConfig = builder
.build()
diff --git a/core/connectors/sdk/README.md b/core/connectors/sdk/README.md
new file mode 100644
index 00000000..13ee85fa
--- /dev/null
+++ b/core/connectors/sdk/README.md
@@ -0,0 +1,19 @@
+# Apache Iggy Connectors - SDK
+
+SDK provides the commonly used structs and traits such as `Sink` and `Source`,
along with the `sink_connector` and `source_connector` macros to be used when
developing connectors.
+
+Moreover, it contains both, the `decoders` and `encoders` modules,
implementing either `StreamDecoder` or `StreamEncoder` traits, which are used
when consuming or producing data from/to Iggy streams.
+
+SDK is WiP, and it'd certainly benefit from having the support of multiple
format schemas, such as Protobuf, Avro, Flatbuffers etc. including
decoding/encoding the data between the different formats (when applicable) and
supporting the data transformations whenever possible (easy for JSON, but
complex for Bincode for example).
+
+
+Last but not least, the different `transforms` are available, to transform
(add, update, delete etc.) the particular fields of the data being processed
via external configuration. It's as simple as adding a new transform to the
`transforms` section of the particular connector configuration:
+
+```toml
+[sources.random.transforms.add_fields]
+enabled = true
+
+[[sources.random.transforms.add_fields.fields]]
+key = "message"
+value.static = "hello"
+```
diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md
new file mode 100644
index 00000000..78541e30
--- /dev/null
+++ b/core/connectors/sinks/README.md
@@ -0,0 +1,213 @@
+# Apache Iggy Connectors - Sink
+
+## Overview
+
+Sink connectors are responsible for writing data from Iggy streams to external
systems or destinations. They provide a way to integrate Apache Iggy with
various data sources and destinations, enabling seamless data flow and
processing.
+
+The sink is represented by the single `Sink` trait, which defines the basic
interface for all sink connectors. It provides methods for initializing the
sink, writing data to external destination, and closing the sink.
+
+```rust
+#[async_trait]
+pub trait Sink: Send + Sync {
+ /// Invoked when the sink is initialized, allowing it to perform any
necessary setup.
+ async fn open(&mut self) -> Result<(), Error>;
+
+ /// Invoked every time a batch of messages is received from the configured
stream(s) and topic(s).
+ async fn consume(
+ &self,
+ topic_metadata: &TopicMetadata,
+ messages_metadata: MessagesMetadata,
+ messages: Vec<ConsumedMessage>,
+ ) -> Result<(), Error>;
+
+ /// Invoked when the sink is closed, allowing it to perform any necessary
cleanup.
+ async fn close(&mut self) -> Result<(), Error>;
+}
+```
+
+
+## Configuration
+
+Sink is configured in the default `config` file used by runtime. Each sink
configuration, is part of the map of <String, SinkConfig>, which can be
represented using toml, json, or yaml.
+
+```rust
+pub struct SinkConfig {
+ pub enabled: bool,
+ pub name: String,
+ pub path: String,
+ pub transforms: Option<TransformsConfig>,
+ pub streams: Vec<StreamConsumerConfig>,
+ pub config: Option<serde_json::Value>,
+}
+```
+
+Below is the example configuration for a sink connector, using `stdout` as
it's unique identifier:
+
+```toml
+# Required configuration for a sink connector
+[sinks.stdout]
+enabled = true
+name = "Stdout sink"
+path = "target/release/libiggy_connector_stdout_sink"
+
+# Collection of the streams from which messages are consumed
+[[sinks.stdout.streams]]
+stream = "example_stream"
+topics = ["example_topic"]
+schema = "json"
+batch_size = 100
+poll_interval = "5ms"
+consumer_group = "stdout_sink_connector"
+
+# Custom configuration for the sink connector, deserialized to type T from
`config` field
+[sinks.stdout.config]
+print_payload = true
+
+# Optional data transformation(s) to be applied after consuming messages from
the stream
+[sinks.stdout.transforms.add_fields]
+enabled = true
+
+# Collection of the fields transforms to be applied after consuming messages
from the stream
+[[sinks.stdout.transforms.add_fields.fields]]
+key = "message"
+value.static = "hello"
+```
+
+## Sample implementation
+
+Let's implement the example sink connector, which will simply print the
messages to the standard output.
+
+Additionally, our sink connector will have its own state, which can be used
e.g. to track the overall progress or store some relevant information when
ingesting the data further to the external sources or tooling.
+
+Also, when implementing the sink connector, make sure to use the
`sink_connector!` macro to expose the FFI interface and allow the connector
runtime to register the sink with the runtime.
+
+And finally, each sink should have its own, custom configuration, which is
passed along with the unique plugin ID via expected `new()` method.
+
+Let's start by defining the internal state and the public sink connector along
with its own configuration.
+
+```rust
+#[derive(Debug)]
+struct State {
+ invocations_count: usize,
+}
+```
+
+```rust
+#[derive(Debug)]
+pub struct StdoutSink {
+ id: u32,
+ print_payload: bool,
+ state: Mutex<State>
+}
+```
+
+```rust
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StdoutSinkConfig {
+ print_payload: Option<bool>,
+}
+```
+
+```rust
+impl StdoutSink {
+ pub fn new(id: u32, config: StdoutSinkConfig) -> Self {
+ StdoutSink {
+ id,
+ print_payload: config.print_payload.unwrap_or(false),
+ state: Mutex::new(State { invocations_count: 0 }),
+ }
+ }
+}
+```
+
+We can invoke the expected macro to expose the FFI interface and allow the
connector runtime to register the sink within the runtime.
+
+```rust
+sink_connector!(StdoutSink);
+```
+
+At a bare minimum, we need to add the following dependencies to the
`Cargo.toml` file to compile the plugin at all:
+
+- dashmap
+- once_cell
+- tracing
+
+Now, let's implement the `Sink` trait for our `StdoutSink` struct.
+
+```rust
+#[async_trait]
+impl Sink for StdoutSink {
+ async fn open(&mut self) -> Result<(), Error> {
+ info!(
+ "Opened stdout sink connector with ID: {}, print payload: {}",
+ self.id, self.print_payload
+ );
+ Ok(())
+ }
+
+ async fn consume(
+ &self,
+ topic_metadata: &TopicMetadata,
+ messages_metadata: MessagesMetadata,
+ messages: Vec<ConsumedMessage>,
+ ) -> Result<(), Error> {
+ let mut state = self.state.lock().await;
+ state.invocations_count += 1;
+ let invocation = state.invocations_count;
+ drop(state);
+
+ info!(
+ "Stdout sink with ID: {} received: {} messages, schema: {},
stream: {}, topic: {}, partition: {}, offset: {}, invocation: {}",
+ self.id,
+ messages.len(),
+ messages_metadata.schema,
+ topic_metadata.stream,
+ topic_metadata.topic,
+ messages_metadata.partition_id,
+ messages_metadata.current_offset,
+ invocation
+ );
+ if self.print_payload {
+ for message in messages {
+ info!(
+ "Message offset: {}, payload: {:#?}",
+ message.offset, message.payload
+ );
+ }
+ }
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<(), Error> {
+ info!("Stdout sink connector with ID: {} is closed.", self.id);
+ Ok(())
+ }
+}
+```
+
+It's also important to note, that the supported format(s) might vary depending
on the connector implementation. For example, you might expect `JSON` as the
payload format, which can be then easily parsed and processed by upstream
components such as data transforms, but at the same time, you could support the
other formats and let the user decide which one to use.
+
+For example, you can match against the `payload` enum field containing the
deserialized value to process (or not) the consumed message(s).
+
+```rust
+for message in messages {
+ match message.payload {
+ Payload::Json(value) => {
+ // Process JSON payload
+ }
+ _ => {
+ warn!("Unsupported payload format: {}", messages_metadata.schema);
+ }
+ }
+}
+```
+
+While the schema of messages (that will be consumed from the Iggy stream),
cannot be controlled by the sink connector itself, the built-in configuration
allows to decide what's the expected format of the messages (the particular
`StreamDecoder` will be used).
+
+Keep in mind, that it might be sometimes difficult/impossible e.g. to
transform one format to another e.g. JSON to SBE or so, and in such a case, the
consumed messages will be ignored.
+
+Eventually, compile the source code and update the runtime configuration file
using the example config above (`config.toml` file by default, unless you
prefer `yaml` or `json` format instead - just make sure that `path` points to
the existing plugin).
+
+And that's all, enjoy using the sink connector!
+
+On a side note, if you'd like to produce the messages to the Iggy stream
instead, you can implement your own **[Source
connector](https://github.com/apache/iggy/tree/master/core/connectors/sources)**
too :)
diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml
b/core/connectors/sinks/stdout_sink/Cargo.toml
index da705597..cf6a8f4d 100644
--- a/core/connectors/sinks/stdout_sink/Cargo.toml
+++ b/core/connectors/sinks/stdout_sink/Cargo.toml
@@ -37,6 +37,7 @@ dashmap = { workspace = true }
iggy_connector_sdk = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true }
+tokio = { workspace = true }
tracing = { workspace = true }
[package.metadata.cargo-machete]
diff --git a/core/connectors/sinks/stdout_sink/src/lib.rs
b/core/connectors/sinks/stdout_sink/src/lib.rs
index b71267da..4ffc4234 100644
--- a/core/connectors/sinks/stdout_sink/src/lib.rs
+++ b/core/connectors/sinks/stdout_sink/src/lib.rs
@@ -21,14 +21,21 @@ use iggy_connector_sdk::{
ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata,
sink_connector,
};
use serde::{Deserialize, Serialize};
+use tokio::sync::Mutex;
use tracing::info;
sink_connector!(StdoutSink);
+#[derive(Debug)]
+struct State {
+ invocations_count: usize,
+}
+
#[derive(Debug)]
pub struct StdoutSink {
id: u32,
print_payload: bool,
+ state: Mutex<State>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -41,6 +48,9 @@ impl StdoutSink {
StdoutSink {
id,
print_payload: config.print_payload.unwrap_or(false),
+ state: Mutex::new(State {
+ invocations_count: 0,
+ }),
}
}
}
@@ -49,7 +59,7 @@ impl StdoutSink {
impl Sink for StdoutSink {
async fn open(&mut self) -> Result<(), Error> {
info!(
- "Initialized stdout sink with ID: {}. Print payload: {}",
+ "Opened stdout sink connector with ID: {}, print payload: {}",
self.id, self.print_payload
);
Ok(())
@@ -61,15 +71,21 @@ impl Sink for StdoutSink {
messages_metadata: MessagesMetadata,
messages: Vec<ConsumedMessage>,
) -> Result<(), Error> {
+ let mut state = self.state.lock().await;
+ state.invocations_count += 1;
+ let invocation = state.invocations_count;
+ drop(state);
+
info!(
- "Stdout sink with ID: {} received: {} messages, schema: {},
stream: {}, topic: {}, partition: {}, offset: {}",
+ "Stdout sink with ID: {} received: {} messages, schema: {},
stream: {}, topic: {}, partition: {}, offset: {}, invocation: {}",
self.id,
messages.len(),
messages_metadata.schema,
topic_metadata.stream,
- topic_metadata.stream,
+ topic_metadata.topic,
messages_metadata.partition_id,
- messages_metadata.current_offset
+ messages_metadata.current_offset,
+ invocation
);
if self.print_payload {
for message in messages {
@@ -83,7 +99,7 @@ impl Sink for StdoutSink {
}
async fn close(&mut self) -> Result<(), Error> {
- info!("Stdout sink with ID: {} is shutting down", self.id);
+ info!("Stdout sink connector with ID: {} is closed.", self.id);
Ok(())
}
}
diff --git a/core/connectors/sources/README.md
b/core/connectors/sources/README.md
new file mode 100644
index 00000000..f4d39cae
--- /dev/null
+++ b/core/connectors/sources/README.md
@@ -0,0 +1,223 @@
+# Apache Iggy Connectors - Source
+
+## Overview
+
+Source connectors are responsible for ingesting data from external sources
into Apache Iggy. They provide a way to integrate Apache Iggy with various data
sources, such as databases, message queues, or file systems.
+
+The source is represented by the single `Source` trait, which defines the
basic interface for all source connectors. It provides methods for initializing
the source, reading data from it, and closing the source.
+
+```rust
+#[async_trait]
+pub trait Source: Send + Sync {
+ /// Invoked when the source is initialized, allowing it to perform any
necessary setup.
+ async fn open(&mut self) -> Result<(), Error>;
+
+ /// Invoked every time a batch of messages is produced to the configured
stream and topic.
+ async fn poll(&self) -> Result<ProducedMessages, Error>;
+
+ /// Invoked when the source is closed, allowing it to perform any
necessary cleanup.
+ async fn close(&mut self) -> Result<(), Error>;
+}
+```
+
+## Configuration
+
+Source is configured in the default `config` file used by runtime. Each source
configuration, is part of the map of <String, SourceConfig>, which can be
represented using toml, json, or yaml.
+
+```rust
+pub struct SourceConfig {
+ pub enabled: bool,
+ pub name: String,
+ pub path: String,
+ pub transforms: Option<TransformsConfig>,
+ pub streams: Vec<StreamProducerConfig>,
+ pub config: Option<serde_json::Value>,
+}
+```
+
+Below is the example configuration for a source connector, using `random` as
it's unique identifier:
+
+```toml
+# Required configuration for a source connector
+[sources.random]
+enabled = true # Toggle source on/off
+name = "Random source" # Name of the source
+path = "libiggy_connector_random_source" # Path to the source connector
+
+# Collection of the streams to which the produced messages are sent
+[[sources.random.streams]]
+stream = "example_stream"
+topic = "example_topic"
+schema = "json"
+batch_size = 100
+send_interval = "5ms"
+
+# Custom configuration for the source connector, deserialized to type T from
`config` field
+[sources.random.config]
+messages_count = 10
+
+# Optional data transformation(s) to be applied before sending messages to the
stream
+[sources.random.transforms.add_fields]
+enabled = true
+
+# Collection of the fields transforms to be applied before sending messages to
the stream
+[[sources.random.transforms.add_fields.fields]]
+key = "message"
+value.static = "hello"
+```
+
+## Sample implementation
+
+Let's implement the example source connector, which will simply generate the N
random messages depending on the count specified in the configuration.
+
+Additionally, our source connector will have its own state, which can be used
e.g. to track the overall progress or store some relevant information when
producing the data from the actual external sources or tooling.
+
+Keep in mind, that the produced messages will be sent further to the specified
stream, however it's already the responsibility of the runtime to handle the
delivery.
+
+Also, when implementing the source connector, make sure to use the
`source_connector!` macro to expose the FFI interface and allow the connector
runtime to register the source with the runtime.
+
+And finally, each source should have its own, custom configuration, which is
passed along with the unique plugin ID via expected `new()` method.
+
+Let's start by defining the internal state and the public source connector
along with its own configuration.
+
+```rust
+#[derive(Debug)]
+struct State {
+ current_id: usize,
+}
+```
+
+```rust
+#[derive(Debug)]
+pub struct RandomSource {
+ id: u32,
+ messages_count: u32,
+ state: Mutex<State>
+}
+```
+
+```rust
+#[derive(Debug, Serialize, Deserialize)]
+pub struct RandomSourceConfig {
+ messages_count: Option<u32>,
+}
+```
+
+At this point, we can expose the expected `new()` method, which will be used
by the runtime to create a new instance of the source connector. The `id` is
assigned by the runtime, and represents the unique identifier of the source
connector.
+
+```rust
+impl RandomSource {
+ pub fn new(id: u32, config: RandomSourceConfig) -> Self {
+ RandomSource {
+ id,
+ payload_size: config.payload_size.unwrap_or(100),
+ state: Mutex::new(State { current_id: 0 }),
+ }
+ }
+}
+```
+
+We can invoke the expected macro to expose the FFI interface and allow the
connector runtime to register the source within the runtime.
+
+```rust
+source_connector!(TestSource);
+```
+
+At a bare minimum, we need to add the following dependencies to the
`Cargo.toml` file to compile the plugin at all:
+
+- dashmap
+- once_cell
+- tracing
+
+Before we make use of the `Source` trait, let's define the internal payload of
the message that will be produced (e.g. as if it was pulled from some external
database or so).
+
+```rust
+#[derive(Debug, Serialize, Deserialize)]
+struct Record {
+ id: u64,
+ text: String,
+}
+```
+
+Now, let's implement the `Source` trait for our `RandomSource` struct. We'll
assume that the amount of messages (provided in the config), will be generated
every 100ms to mimic the behavior of a real-world external source.
+
+```rust
+#[async_trait]
+impl Source for RandomSource {
+ async fn open(&mut self) -> Result<(), iggy_connector_sdk::Error> {
+ info!(
+ "Opened random source connector with ID: {}, messages count: {}",
+ self.id, self.messages_count
+ );
+ Ok(())
+ }
+
+ async fn poll(&self) -> Result<ProducedMessages,
iggy_connector_sdk::Error> {
+ sleep(Duration::from_millis(100)).await;
+ let mut state = self.state.lock().await;
+ let current_id = state.current_id;
+
+ let mut messages = Vec::new();
+ for _ in 0..self.messages_count {
+ current_id += 1;
+ let record = Record {
+ id: current_id,
+ text: format!("Hello from random source connector:
#{current_id}")
+ };
+ let Ok(payload) = simd_json::to_vec(&record) else {
+ error!(
+ "Failed to serialize record by random source connector
with ID: {}",
+ self.id
+ );
+ continue;
+ };
+
+ let message = ProducedMessage {
+ id: None,
+ headers: None,
+ checksum: None,
+ timestamp: None,
+ origin_timestamp: None,
+ payload,
+ };
+ messages.push(message);
+ }
+
+ state.current_id += current_id;
+ info!(
+ "Generated {} messages by random source connector with ID: {}"
+ messages.len(),
+ self.id,
+ );
+ Ok(ProducedMessages {
+ schema: Schema::Json,
+ messages,
+ })
+ }
+
+ async fn close(&mut self) -> Result<(), Error> {
+ info!("Random source connector with ID: {} is closed.", self.id);
+ Ok(())
+ }
+}
+```
+
+As you can see, the `ProducedMessage` can be customized to fit your needs, as
all the fields will be directly mapped to the existing Iggy message struct.
+
+It's also important to note, that the supported format(s) might vary depending
on the connector implementation. For example, you might use `JSON` as the
payload format, which can be then easily parsed and processed by downstream
components such as data transforms, but at the same time, you could support the
other formats and let the user decide which one to use.
+
+While the final schema of messages (that will be appended to the Iggy stream),
can be controlled with the built-in configuration (the particular
`StreamEncoder` will be used), keep in mind, that it might be sometimes
difficult/impossible e.g. to transform one format to another e.g. JSON to SBE
or so, and in such a case, the produced messages will be ignored.
+
+Eventually, compile the source code and update the runtime configuration file
using the example config above (`config.toml` file by default, unless you
prefer `yaml` or `json` format instead - just make sure that `path` points to
the existing plugin).
+
+And before starting the runtime, do not forget to create the specified stream
and topic e.g. via Iggy CLI.
+
+```
+iggy --username iggy --password iggy stream create example_stream
+
+iggy --username iggy --password iggy topic create example_stream example_topic
1 none 1d
+```
+
+And that's all, enjoy using the source connector!
+
+On a side note, if you'd like to process the messages consumed from the Iggy
stream instead, you can implement your own **[Sink
connector](https://github.com/apache/iggy/tree/master/core/connectors/sinks)**
too :)
diff --git a/core/connectors/sources/test_source/Cargo.toml
b/core/connectors/sources/random_source/Cargo.toml
similarity index 97%
rename from core/connectors/sources/test_source/Cargo.toml
rename to core/connectors/sources/random_source/Cargo.toml
index 4c8bd949..efd6c389 100644
--- a/core/connectors/sources/test_source/Cargo.toml
+++ b/core/connectors/sources/random_source/Cargo.toml
@@ -16,7 +16,7 @@
# under the License.
[package]
-name = "iggy_connector_test_source"
+name = "iggy_connector_random_source"
version = "0.1.0"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
diff --git a/core/connectors/sources/test_source/src/lib.rs
b/core/connectors/sources/random_source/src/lib.rs
similarity index 84%
rename from core/connectors/sources/test_source/src/lib.rs
rename to core/connectors/sources/random_source/src/lib.rs
index 69f3ff60..489a2f26 100644
--- a/core/connectors/sources/test_source/src/lib.rs
+++ b/core/connectors/sources/random_source/src/lib.rs
@@ -28,13 +28,13 @@ use rand::{
};
use serde::{Deserialize, Serialize};
use tokio::{sync::Mutex, time::sleep};
-use tracing::info;
+use tracing::{error, info};
use uuid::Uuid;
-source_connector!(TestSource);
+source_connector!(RandomSource);
#[derive(Debug)]
-pub struct TestSource {
+pub struct RandomSource {
id: u32,
max_count: Option<usize>,
interval: Duration,
@@ -44,7 +44,7 @@ pub struct TestSource {
}
#[derive(Debug, Serialize, Deserialize)]
-pub struct TestSourceConfig {
+pub struct RandomSourceConfig {
interval: Option<String>,
max_count: Option<usize>,
messages_range: Option<(u32, u32)>,
@@ -56,12 +56,12 @@ struct State {
current_number: usize,
}
-impl TestSource {
- pub fn new(id: u32, config: TestSourceConfig) -> Self {
+impl RandomSource {
+ pub fn new(id: u32, config: RandomSourceConfig) -> Self {
let interval = config.interval.unwrap_or("1s".to_string());
let interval = humantime::Duration::from_str(&interval)
.unwrap_or(humantime::Duration::from_str("1s").unwrap());
- TestSource {
+ RandomSource {
id,
max_count: config.max_count,
interval: *interval,
@@ -84,7 +84,11 @@ impl TestSource {
text: self.generate_random_text(),
};
let Ok(payload) = simd_json::to_vec(&record) else {
- panic!("Failed to serialize record");
+ error!(
+ "Failed to serialize record by random source connector
with ID: {}",
+ self.id
+ );
+ continue;
};
let message = ProducedMessage {
@@ -110,10 +114,10 @@ impl TestSource {
}
#[async_trait]
-impl Source for TestSource {
+impl Source for RandomSource {
async fn open(&mut self) -> Result<(), iggy_connector_sdk::Error> {
info!(
- "Initialized test source with ID {}. Interval: {:#?}, max offset:
{:#?}, messages range: {} - {}, payload size: {}",
+ "Initialized random source connector with ID: {}. Interval: {:#?},
max offset: {:#?}, messages range: {} - {}, payload size: {}",
self.id,
self.interval,
self.max_count,
@@ -130,7 +134,7 @@ impl Source for TestSource {
if let Some(max_count) = self.max_count {
if state.current_number >= max_count {
info!(
- "Reached max number of {max_count} messages for test
source with ID {}",
+ "Reached max number of {max_count} messages for random
source connector with ID: {}",
self.id
);
return Ok(ProducedMessages {
@@ -143,7 +147,7 @@ impl Source for TestSource {
let messages = self.generate_messages();
state.current_number += messages.len();
info!(
- "Generated {} messages by test source with ID {}",
+ "Generated {} messages by random source connector with ID: {}",
messages.len(),
self.id
);
@@ -154,7 +158,10 @@ impl Source for TestSource {
}
async fn close(&mut self) -> Result<(), Error> {
- info!("Test source with ID {} is shutting down", self.id);
+ info!(
+ "Random source connector with ID: {} is shutting down",
+ self.id
+ );
Ok(())
}
}