This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch connectors_docs in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 8f934e0f7748c4473deb4ac4fa8513b620e8ab59 Author: spetz <[email protected]> AuthorDate: Tue Jun 3 21:29:48 2025 +0200 Add connectors docs, WiP --- Cargo.lock | 1 + core/connectors/README.md | 47 ++---- core/connectors/runtime/README.md | 25 +++ core/connectors/runtime/config.toml | 20 +++ core/connectors/runtime/src/main.rs | 2 +- core/connectors/sdk/README.md | 19 +++ core/connectors/sinks/README.md | 208 +++++++++++++++++++++++++ core/connectors/sinks/stdout_sink/Cargo.toml | 1 + core/connectors/sinks/stdout_sink/src/lib.rs | 26 +++- core/connectors/sources/README.md | 219 +++++++++++++++++++++++++++ 10 files changed, 525 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b1556a42..2ed669d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3738,6 +3738,7 @@ dependencies = [ "iggy_connector_sdk", "once_cell", "serde", + "tokio", "tracing", ] diff --git a/core/connectors/README.md b/core/connectors/README.md index 3bbb1d1b..68051e50 100644 --- a/core/connectors/README.md +++ b/core/connectors/README.md @@ -34,58 +34,31 @@ iggy --username iggy --password iggy topic create qw records 1 none 1d 6. Start the connector runtime `cargo r --bin iggy_connector_runtime -r` - you should be able to browse Quickwit UI with records being constantly added to the `events` index. At the same time, you should see the new messages being added to the `example` stream and `topic1` topic by the test source connector - you can use Iggy Web UI to browse the data. The messages will have applied the basic fields transformations. -## Building the connectors +## Runtime -New connector can be built simply by implementing either `Sink` or `Source` trait. Please check the existing examples under `core/connectors/sinks` and `core/connectors/sources` directories. +All the connectors are implemented as Rust libraries and can be used as a part of the connector runtime. The runtime is responsible for managing the lifecycle of the connectors and providing the necessary infrastructure for the connectors to run. For more information, please refer to the **[runtime documentation](https://github.com/apache/iggy/tree/master/core/connectors/runtime)**. -### Sink +## Sink Sinks are responsible for consuming the messages from the configured stream(s) and topic(s) and sending them further to the specified destination. For example, the Quickwit sink connector is responsible for sending the messages to the Quickwit indexer. -```rust -#[async_trait] -pub trait Sink: Send + Sync { - /// Invoked when the sink is initialized, allowing it to perform any necessary setup. - async fn open(&mut self) -> Result<(), Error>; - - /// Invoked every time a batch of messages is received from the configured stream(s) and topic(s). - async fn consume( - &self, - topic_metadata: &TopicMetadata, - messages_metadata: MessagesMetadata, - messages: Vec<ConsumedMessage>, - ) -> Result<(), Error>; - - /// Invoked when the sink is closed, allowing it to perform any necessary cleanup. - async fn close(&mut self) -> Result<(), Error>; -} -``` +Please refer to the **[Sink documentation](https://github.com/apache/iggy/tree/master/core/connectors/sinks)** for the details about the configuration and the sample implementation. When implementing `Sink`, make sure to use the `sink_connector!` macro to expose the FFI interface and allow the connector runtime to register the sink with the runtime. Each sink should have its own, custom configuration, which is passed along with the unique plugin ID via expected `new()` method. -### Source +## Source Sources are responsible for producing the messages to the configured stream(s) and topic(s). For example, the Test source connector will generate the random messages that will be then sent to the configured stream and topic. -```rust -#[async_trait] -pub trait Source: Send + Sync { - /// Invoked when the source is initialized, allowing it to perform any necessary setup. - async fn open(&mut self) -> Result<(), Error>; +Please refer to the **[Source documentation](https://github.com/apache/iggy/tree/master/core/connectors/sources)** for the details about the configuration and the sample implementation. - /// Invoked every time a batch of messages is produced to the configured stream and topic. - async fn poll(&self) -> Result<ProducedMessages, Error>; - - /// Invoked when the source is closed, allowing it to perform any necessary cleanup. - async fn close(&mut self) -> Result<(), Error>; -} -``` - -When implementing `Source`, make sure to use the `source_connector!` macro to expose the FFI interface and allow the connector runtime to register the source with the runtime. -Each source should have its own, custom configuration, which is passed along with the unique plugin ID via expected `new()` method. +## Building the connectors +New connector can be built simply by implementing either `Sink` or `Source` trait. Please check the **[sink](https://github.com/apache/iggy/tree/master/core/connectors/sinks)** or **[source](https://github.com/apache/iggy/tree/master/core/connectors/sources)** documentation, as well as the existing examples under `/sinks` and `/sources` directories. ## Transformations Field transformations (depending on the supported payload formats) can be applied to the messages either before they are sent to the specified topic (e.g. when produced by the source connectors), or before consumed by the sink connectors. To add the new transformation, simply implement the `Transform` trait and extend the existing `load` function. Each transform may have its own, custom configuration. + +To find out more about the transforms, stream decoders or encoders, please refer to the **[SDK documentation](https://github.com/apache/iggy/tree/master/core/connectors/sdk)**. diff --git a/core/connectors/runtime/README.md b/core/connectors/runtime/README.md new file mode 100644 index 00000000..23a35c11 --- /dev/null +++ b/core/connectors/runtime/README.md @@ -0,0 +1,25 @@ +# Apache Iggy Connectors - Runtime + +Runtime is responsible for managing the lifecycle of the connectors and providing the necessary infrastructure for the connectors to run. + +The runtime uses a shared [Tokio runtime](https://tokio.rs) to manage the asynchronous tasks and events across all connectors. Additionally, it has built-in support for logging via [tracing](https://docs.rs/tracing/latest/tracing/) crate. + +The connector are implemented as Rust libraries, and these are loaded dynamically during the runtime initialization process. + +Internally, [dlopen2](https://github.com/OpenByteDev/dlopen2) provides a safe and efficient way of loading the plugins via C FFI. + +By default, runtime will look for the configuration file, to decide which connectors to load and how to configure them. + +The minimal viable configuration requires at least the Iggy credentials, to create 2 separate instances of producer & consumer connections. + +```toml +[iggy] +address = "localhost:8090" +username = "iggy" +password = "iggy" +# token = "secret" # Personal Access Token (PAT) can be used instead of username and password +``` + +All the other config sections start either with `sources` or `sinks` depending on the connector type. + +Keep in mind that either of `toml`, `yaml`, or `json` formats are supported for the configuration file. The path to the configuration can be overriden by `IGGY_CONNECTORS_RUNTIME_CONFIG_PATH` environment variable. Each configuration section can be also additionally updated by using the following convention `IGGY_CONNECTORS_SECTION_NAME.KEY_NAME` e.g. `IGGY_CONNECTORS_IGGY_USERNAME` and so on. diff --git a/core/connectors/runtime/config.toml b/core/connectors/runtime/config.toml index 67b7d5bc..ba0a3a3a 100644 --- a/core/connectors/runtime/config.toml +++ b/core/connectors/runtime/config.toml @@ -21,6 +21,26 @@ username = "iggy" password = "iggy" # token = "secret" # Personal Access Token (PAT) can be used instead of username and password +[sinks.stdout] +enabled = true +name = "Stdout sink" +path = "target/release/libiggy_connector_stdout_sink" + +[[sinks.stdout.streams]] +stream = "example_stream" +topics = ["example_topic"] +schema = "json" +batch_size = 100 +poll_interval = "5ms" +consumer_group = "qw_sink_connector" + +[sinks.stdout.transforms.add_fields] +enabled = true + +[[sinks.stdout.transforms.add_fields.fields]] +key = "message" +value.static = "hello" + [sources.test1] enabled = true name = "Test source" diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index cbffc619..f87057bf 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -83,7 +83,7 @@ async fn main() -> Result<(), RuntimeError> { info!("Starting Iggy Connector Runtime, loading configuration from: {config_path}..."); let builder = Config::builder() .add_source(File::with_name(&config_path)) - .add_source(Environment::with_prefix("IGGY").separator("_")); + .add_source(Environment::with_prefix("IGGY_CONNECTORS").separator("_")); let config: RuntimeConfig = builder .build() diff --git a/core/connectors/sdk/README.md b/core/connectors/sdk/README.md new file mode 100644 index 00000000..13ee85fa --- /dev/null +++ b/core/connectors/sdk/README.md @@ -0,0 +1,19 @@ +# Apache Iggy Connectors - SDK + +SDK provides the commonly used structs and traits such as `Sink` and `Source`, along with the `sink_connector` and `source_connector` macros to be used when developing connectors. + +Moreover, it contains both, the `decoders` and `encoders` modules, implementing either `StreamDecoder` or `StreamEncoder` traits, which are used when consuming or producing data from/to Iggy streams. + +SDK is WiP, and it'd certainly benefit from having the support of multiple format schemas, such as Protobuf, Avro, Flatbuffers etc. including decoding/encoding the data between the different formats (when applicable) and supporting the data transformations whenever possible (easy for JSON, but complex for Bincode for example). + + +Last but not least, the different `transforms` are available, to transform (add, update, delete etc.) the particular fields of the data being processed via external configuration. It's as simple as adding a new transform to the `transforms` section of the particular connector configuration: + +```toml +[sources.random.transforms.add_fields] +enabled = true + +[[sources.random.transforms.add_fields.fields]] +key = "message" +value.static = "hello" +``` diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md new file mode 100644 index 00000000..248da159 --- /dev/null +++ b/core/connectors/sinks/README.md @@ -0,0 +1,208 @@ +# Apache Iggy Connectors - Sink + +## Overview + +Sink connectors are responsible for writing data from Iggy streams to external systems or destinations. They provide a way to integrate Apache Iggy with various data sources and destinations, enabling seamless data flow and processing. + +The sink is represented by the single `Sink` trait, which defines the basic interface for all source connectors. It provides methods for initializing the sink, writing data to external destonation, and closing the sink. + +```rust +#[async_trait] +pub trait Source: Send + Sync { + /// Invoked when the source is initialized, allowing it to perform any necessary setup. + async fn open(&mut self) -> Result<(), Error>; + + /// Invoked every time a batch of messages is produced to the configured stream and topic. + async fn poll(&self) -> Result<ProducedMessages, Error>; + + /// Invoked when the source is closed, allowing it to perform any necessary cleanup. + async fn close(&mut self) -> Result<(), Error>; +} +``` + + +## Configuration + +Sink is configured in the default `config` file used by runtime. Each sink configuration, is part of the map of <String, SinkConfig>, which can be represented using toml, json, or yaml. + +```rust +pub struct SinkConfig { + pub enabled: bool, + pub name: String, + pub path: String, + pub transforms: Option<TransformsConfig>, + pub streams: Vec<StreamConsumerConfig>, + pub config: Option<serde_json::Value>, +} +``` + +Below is the example configuration for a sink connector, using `stdout` as it's unique identifier: + +```toml +# Required configuration for a sink connector +[sinks.stdout] +enabled = true +name = "Stdout sink" +path = "target/release/libiggy_connector_stdout_sink" + +# Collection of the streams from which messages are consumed +[[sinks.stdout.streams]] +stream = "example_stream" +topics = ["example_topic"] +schema = "json" +batch_size = 100 +poll_interval = "5ms" +consumer_group = "stdout_sink_connector" + +# Custom configuration for the sink connector, deserialized to type T from `config` field +[sinks.stdout.config] +print_payload = true + +# Optional data transformation(s) to be applied after consuming messages from the stream +[sinks.stdout.transforms.add_fields] +enabled = true + +# Collection of the fields transforms to be applied after consuming messages from the stream +[[sinks.stdout.transforms.add_fields.fields]] +key = "message" +value.static = "hello" +``` + +## Sample implementation + +Let's implement the example sink connector, which will simply print the messages to the standard output. + +Additionally, our sink connector will have its own state, which can be used e.g. to track the overall progress or store some relevant information when ingesting the data further to the external sources or tooling. + +Also, when implementing the sink connector, make sure to use the `sink_connector!` macro to expose the FFI interface and allow the connector runtime to register the sink with the runtime. + +And finally, each sink should have its own, custom configuration, which is passed along with the unique plugin ID via expected `new()` method. + +Let's start by defining the internal state and the public sink connector along with its own configuration. + +```rust +#[derive(Debug)] +struct State { + invocations_count: usize, +} +``` + +```rust +#[derive(Debug)] +pub struct StdoutSink { + id: u32, + print_payload: bool, + state: Mutex<State> +} +``` + +```rust +#[derive(Debug, Serialize, Deserialize)] +pub struct StdoutSinkConfig { + print_payload: Option<bool>, +} +``` + +```rust +impl StdoutSink { + pub fn new(id: u32, config: StdoutSinkConfig) -> Self { + StdoutSink { + id, + print_payload: config.print_payload.unwrap_or(false), + state: Mutex::new(State { invocations_count: 0 }), + } + } +} +``` + +And we can invoke the expected macro to expose the FFI interface and allow the connector runtime to register the sink within the runtime. + +```rust +sink_connector!(StdoutSink); +``` + +At a bare minimum, we need to add the following dependencies to the `Cargo.toml` file to compile the plugin at all: + +- dashmap +- once_cell +- tracing + +Now, let's implement the `Sink` trait for our `StdoutSink` struct. + +```rust +#[async_trait] +impl Sink for StdoutSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Opened stdout sink connector with ID: {}, print payload: {}", + self.id, self.print_payload + ); + Ok(()) + } + + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + let mut state = self.state.lock().await; + state.invocations_count += 1; + let invocation = state.invocations_count; + drop(state); + + info!( + "Stdout sink with ID: {} received: {} messages, schema: {}, stream: {}, topic: {}, partition: {}, offset: {}, invocation: {}", + self.id, + messages.len(), + messages_metadata.schema, + topic_metadata.stream, + topic_metadata.topic, + messages_metadata.partition_id, + messages_metadata.current_offset, + invocation + ); + if self.print_payload { + for message in messages { + info!( + "Message offset: {}, payload: {:#?}", + message.offset, message.payload + ); + } + } + Ok(()) + } + + async fn close(&mut self) -> Result<(), Error> { + info!("Stdout sink connector with ID: {} is closed.", self.id); + Ok(()) + } +} +``` + +It's also important to note, that the supported format(s) might vary depending on the connector implementation. For example, you might expect `JSON` as the payload format, which can be then easily parsed and processed by upstream components such as data transforms, but at the same time, you could support the other formats and let the user decide which one to use. + +For example, you can match against the `payload` enum field containing the deserialized value to process (or not) the consumed message(s). + +```rust +for message in messages { + match message.payload { + Payload::Json(value) => { + // Process JSON payload + } + _ => { + warn!("Unsupported payload format: {}", messages_metadata.schema); + } + } +} +``` + +While the schema of messages (that will be consumed from the Iggy stream), cannot be controlled by the sink connector itself, the built-in configuration allows to decide what's the expected format of the messages (the particular `StreamDecoder` will be used). + +Keep in mind, that it might be sometimes difficult/impossible e.g. to transform one format to another e.g. JSON to SBE or so, and in such a case, the consumed messages will be ignored. + +Eventually, compile the source code and update the runtime configuration file using the example config above (`config.toml` file by default, unless you prefer `yaml` or `json` format instead - just make sure that `path` points to the existing plugin). + +And that's all, enjoy using the sink connector! + +On a side note, if you'd like to produce the messages to the Iggy stream instead, you can implement your own **[Source connector](https://github.com/apache/iggy/tree/master/core/connectors/sources)** too :) diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml b/core/connectors/sinks/stdout_sink/Cargo.toml index da705597..cf6a8f4d 100644 --- a/core/connectors/sinks/stdout_sink/Cargo.toml +++ b/core/connectors/sinks/stdout_sink/Cargo.toml @@ -37,6 +37,7 @@ dashmap = { workspace = true } iggy_connector_sdk = { workspace = true } once_cell = { workspace = true } serde = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } [package.metadata.cargo-machete] diff --git a/core/connectors/sinks/stdout_sink/src/lib.rs b/core/connectors/sinks/stdout_sink/src/lib.rs index b71267da..4ffc4234 100644 --- a/core/connectors/sinks/stdout_sink/src/lib.rs +++ b/core/connectors/sinks/stdout_sink/src/lib.rs @@ -21,14 +21,21 @@ use iggy_connector_sdk::{ ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata, sink_connector, }; use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; use tracing::info; sink_connector!(StdoutSink); +#[derive(Debug)] +struct State { + invocations_count: usize, +} + #[derive(Debug)] pub struct StdoutSink { id: u32, print_payload: bool, + state: Mutex<State>, } #[derive(Debug, Serialize, Deserialize)] @@ -41,6 +48,9 @@ impl StdoutSink { StdoutSink { id, print_payload: config.print_payload.unwrap_or(false), + state: Mutex::new(State { + invocations_count: 0, + }), } } } @@ -49,7 +59,7 @@ impl StdoutSink { impl Sink for StdoutSink { async fn open(&mut self) -> Result<(), Error> { info!( - "Initialized stdout sink with ID: {}. Print payload: {}", + "Opened stdout sink connector with ID: {}, print payload: {}", self.id, self.print_payload ); Ok(()) @@ -61,15 +71,21 @@ impl Sink for StdoutSink { messages_metadata: MessagesMetadata, messages: Vec<ConsumedMessage>, ) -> Result<(), Error> { + let mut state = self.state.lock().await; + state.invocations_count += 1; + let invocation = state.invocations_count; + drop(state); + info!( - "Stdout sink with ID: {} received: {} messages, schema: {}, stream: {}, topic: {}, partition: {}, offset: {}", + "Stdout sink with ID: {} received: {} messages, schema: {}, stream: {}, topic: {}, partition: {}, offset: {}, invocation: {}", self.id, messages.len(), messages_metadata.schema, topic_metadata.stream, - topic_metadata.stream, + topic_metadata.topic, messages_metadata.partition_id, - messages_metadata.current_offset + messages_metadata.current_offset, + invocation ); if self.print_payload { for message in messages { @@ -83,7 +99,7 @@ impl Sink for StdoutSink { } async fn close(&mut self) -> Result<(), Error> { - info!("Stdout sink with ID: {} is shutting down", self.id); + info!("Stdout sink connector with ID: {} is closed.", self.id); Ok(()) } } diff --git a/core/connectors/sources/README.md b/core/connectors/sources/README.md new file mode 100644 index 00000000..686c7939 --- /dev/null +++ b/core/connectors/sources/README.md @@ -0,0 +1,219 @@ +# Apache Iggy Connectors - Source + +## Overview + +Source connectors are responsible for ingesting data from external sources into Apache Iggy. They provide a way to integrate Apache Iggy with various data sources, such as databases, message queues, or file systems. + +The source is represented by the single `Source` trait, which defines the basic interface for all source connectors. It provides methods for initializing the source, reading data from it, and closing the source. + +```rust +#[async_trait] +pub trait Source: Send + Sync { + /// Invoked when the source is initialized, allowing it to perform any necessary setup. + async fn open(&mut self) -> Result<(), Error>; + + /// Invoked every time a batch of messages is produced to the configured stream and topic. + async fn poll(&self) -> Result<ProducedMessages, Error>; + + /// Invoked when the source is closed, allowing it to perform any necessary cleanup. + async fn close(&mut self) -> Result<(), Error>; +} +``` + +## Configuration + +Source is configured in the default `config` file used by runtime. Each source configuration, is part of the map of <String, SourceConfig>, which can be represented using toml, json, or yaml. + +```rust +pub struct SourceConfig { + pub enabled: bool, + pub name: String, + pub path: String, + pub transforms: Option<TransformsConfig>, + pub streams: Vec<StreamProducerConfig>, + pub config: Option<serde_json::Value>, +} +``` + +Below is the example configuration for a source connector, using `random` as it's unique identifier: + +```toml +# Required configuration for a source connector +[sources.random] +enabled = true # Toggle source on/off +name = "Random source" # Name of the source +path = "libiggy_connector_random_source" # Path to the source connector + +# Collection of the streams to which the produced messages are sent +[[sources.random.streams]] +stream = "example_stream" +topic = "example_topic" +schema = "json" +batch_size = 100 +send_interval = "5ms" + +# Custom configuration for the source connector, deserialized to type T from `config` field +[sources.random.config] +messages_count = 10 + +# Optional data transformation(s) to be applied before sending messages to the stream +[sources.random.transforms.add_fields] +enabled = true + +# Collection of the fields transforms to be applied before sending messages to the stream +[[sources.random.transforms.add_fields.fields]] +key = "message" +value.static = "hello" +``` + +## Sample implementation + +Let's implement the example source connector, which will simply generate the N random messages depending on the count specified in the configuration. + +Additionally, our source connector will have its own state, which can be used e.g. to track the overall progress or store some relevant information when producing the data from the actual external sources or tooling. + +Keep in mind, that the produces messages will be sent further to the specified stream, but it's already the responsibility of the runtime to handle the delivery. + +Also, when implementing the source connector, make sure to use the `source_connector!` macro to expose the FFI interface and allow the connector runtime to register the source with the runtime. + +And finally, each source should have its own, custom configuration, which is passed along with the unique plugin ID via expected `new()` method. + +Let's start by defining the internal state and the public source connector along with its own configuration. + +```rust +#[derive(Debug)] +struct State { + current_id: usize, +} +``` + +```rust +#[derive(Debug)] +pub struct RandomSource { + id: u32, + messages_count: u32, + state: Mutex<State> +} +``` + +```rust +#[derive(Debug, Serialize, Deserialize)] +pub struct RandomSourceConfig { + messages_count: Option<u32>, +} +``` + +At this point, we can expose the expected `new()` method, which will be used by the runtime to create a new instance of the source connector. The `id` is assigned by the runtime, and represents the unique identifier of the source connector. + +```rust +impl RandomSource { + pub fn new(id: u32, config: RandomSourceConfig) -> Self { + RandomSource { + id, + payload_size: config.payload_size.unwrap_or(100), + state: Mutex::new(State { current_id: 0 }), + } + } +} +``` + +And we can invoke the expected macro to expose the FFI interface and allow the connector runtime to register the source within the runtime. + +```rust +source_connector!(TestSource); +``` + +At a bare minimum, we need to add the following dependencies to the `Cargo.toml` file to compile the plugin at all: + +- dashmap +- once_cell +- tracing + +Before we make use of the `Source` trait, let's define the internal payload of the message that will be produced (e.g. as if it was pulled from some external database or so). + +```rust +#[derive(Debug, Serialize, Deserialize)] +struct Record { + id: u64, + text: String, +} +``` + +Now, let's implement the `Source` trait for our `RandomSource` struct. We'll assume that the amount of messages (provided in the config), will be generated every 100ms to mimic the behavior of a real-world external source. + +```rust +#[async_trait] +impl Source for RandomSource { + async fn open(&mut self) -> Result<(), iggy_connector_sdk::Error> { + info!( + "Opened random source connector with ID: {}, messages count: {}", + self.id, self.messages_count + ); + Ok(()) + } + + async fn poll(&self) -> Result<ProducedMessages, iggy_connector_sdk::Error> { + sleep(Duration::from_millis(100)).await; + let mut state = self.state.lock().await; + let current_id = state.current_id; + + let mut messages = Vec::new(); + for _ in 0..self.messages_count { + current_id += 1; + let record = Record { + id: current_id, + text: format!("Hello from Random Source Connector: #{current_id}") + }; + let Ok(payload) = simd_json::to_vec(&record) else { + panic!("Failed to serialize record"); + }; + + let message = ProducedMessage { + id: None, + headers: None, + checksum: None, + timestamp: None, + origin_timestamp: None, + payload, + }; + messages.push(message); + } + + state.current_id += current_id; + info!( + "Generated {} messages by random source with ID: {}" + messages.len(), + self.id, + ); + Ok(ProducedMessages { + schema: Schema::Json, + messages, + }) + } + + async fn close(&mut self) -> Result<(), Error> { + info!("Random source connector with ID: {} is closed.", self.id); + Ok(()) + } +} +``` + +As you can see, the `ProducedMessage` can be customized to fit your needs, as all the fields will be directly mapped to the existing Iggy message struct. + +It's also important to note, that the supported format(s) might vary depending on the connector implementation. For example, you might use `JSON` as the payload format, which can be then easily parsed and processed by downstream components such as data transforms, but at the same time, you could support the other formats and let the user decide which one to use. + +While the final schema of messages (that will be appended to the Iggy stream), can be controlled with the the built-in configuration (the particular `StreamEncoder` will be used), keep in mind, that it might be sometimes difficult/impossible e.g. to transform one format to another e.g. JSON to SBE or so, and in such a case, the produced messages will be ignored. + +Eventually, compile the source code and update the runtime configuration file using the example config above (`config.toml` file by default, unless you prefer `yaml` or `json` format instead - just make sure that `path` points to the existing plugin). + +And before starting the runtime, do not forget to create the specified stream and topic e.g. via Iggy CLI. + +``` +iggy --username iggy --password iggy stream create example_stream + +iggy --username iggy --password iggy topic create example_stream example_topic 1 none 1d +``` + +And that's all, enjoy using the source connector! + +On a side note, if you'd like to process the messages consumed from the Iggy stream instead, you can implement your own **[Sink connector](https://github.com/apache/iggy/tree/master/core/connectors/sinks)** too :)
