This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 115184f1e7291d7becac5d9fdaeae2522ee72fbb
Author: spetz <[email protected]>
AuthorDate: Sat May 24 21:11:54 2025 +0200

    feat(repo): add connectors runtime
---
 Cargo.lock                                         | 352 ++++++++++++++++++++
 Cargo.toml                                         |  21 +-
 core/connectors/README.md                          |  35 ++
 core/connectors/data_producer/Cargo.toml           |  40 +++
 core/connectors/data_producer/src/main.rs          | 146 +++++++++
 core/connectors/docker-compose.yml                 |  38 +++
 core/connectors/runtime/Cargo.toml                 |  47 +++
 core/connectors/runtime/config.toml                | 131 ++++++++
 core/connectors/runtime/src/main.rs                | 355 ++++++++++++++++++++
 core/connectors/runtime/src/sink.rs                | 359 +++++++++++++++++++++
 core/connectors/runtime/src/source.rs              | 301 +++++++++++++++++
 core/connectors/runtime/src/transform.rs           |  64 ++++
 core/connectors/sdk/Cargo.toml                     |  46 +++
 core/connectors/sdk/src/decoders/json.rs           |  37 +++
 core/connectors/sdk/src/decoders/mod.rs            |  21 ++
 core/connectors/sdk/src/decoders/raw.rs            |  31 ++
 core/connectors/sdk/src/decoders/text.rs           |  37 +++
 core/connectors/sdk/src/encoders/json.rs           |  39 +++
 core/connectors/sdk/src/encoders/mod.rs            |  21 ++
 core/connectors/sdk/src/encoders/raw.rs            |  34 ++
 core/connectors/sdk/src/encoders/text.rs           |  37 +++
 core/connectors/sdk/src/lib.rs                     | 239 ++++++++++++++
 core/connectors/sdk/src/sink.rs                    | 240 ++++++++++++++
 core/connectors/sdk/src/source.rs                  | 218 +++++++++++++
 core/connectors/sdk/src/transforms/add_fields.rs   | 128 ++++++++
 .../connectors/sdk/src/transforms/delete_fields.rs |  61 ++++
 core/connectors/sdk/src/transforms/mod.rs          |  45 +++
 core/connectors/sinks/quickwit_sink/Cargo.toml     |  46 +++
 core/connectors/sinks/quickwit_sink/src/lib.rs     | 209 ++++++++++++
 core/connectors/sinks/stdout_sink/Cargo.toml       |  43 +++
 core/connectors/sinks/stdout_sink/src/lib.rs       |  88 +++++
 core/connectors/sources/test_source/Cargo.toml     |  47 +++
 core/connectors/sources/test_source/src/lib.rs     | 157 +++++++++
 core/server/Cargo.toml                             |   3 +
 34 files changed, 3713 insertions(+), 3 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 6df9d5a5..492c2335 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -420,6 +420,12 @@ version = "1.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
 
+[[package]]
+name = "arraydeque"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236"
+
 [[package]]
 name = "arrayref"
 version = "0.3.9"
@@ -595,6 +601,15 @@ dependencies = [
  "bytemuck",
 ]
 
+[[package]]
+name = "atomic-polyfill"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8cf2bce30dfe09ef0bfaef228b9d414faaf7e563035494d7fe092dba54b300f4"
+dependencies = [
+ "critical-section",
+]
+
 [[package]]
 name = "atomic-waker"
 version = "1.1.2"
@@ -1013,6 +1028,9 @@ name = "bitflags"
 version = "2.9.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
+dependencies = [
+ "serde",
+]
 
 [[package]]
 name = "bitvec"
@@ -1407,6 +1425,12 @@ dependencies = [
  "cc",
 ]
 
+[[package]]
+name = "cobs"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15"
+
 [[package]]
 name = "colorchoice"
 version = "1.0.3"
@@ -1452,6 +1476,25 @@ dependencies = [
  "crossbeam-utils",
 ]
 
+[[package]]
+name = "config"
+version = "0.15.11"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "595aae20e65c3be792d05818e8c63025294ac3cb7e200f11459063a352a6ef80"
+dependencies = [
+ "async-trait",
+ "convert_case 0.6.0",
+ "json5",
+ "pathdiff",
+ "ron",
+ "rust-ini",
+ "serde",
+ "serde_json",
+ "toml",
+ "winnow 0.7.10",
+ "yaml-rust2",
+]
+
 [[package]]
 name = "console-api"
 version = "0.8.1"
@@ -1621,6 +1664,12 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "critical-section"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b"
+
 [[package]]
 name = "crossbeam"
 version = "0.8.4"
@@ -1979,6 +2028,29 @@ dependencies = [
  "syn 2.0.101",
 ]
 
+[[package]]
+name = "dlopen2"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9e1297103d2bbaea85724fcee6294c2d50b1081f9ad47d0f6f6f61eda65315a6"
+dependencies = [
+ "dlopen2_derive",
+ "libc",
+ "once_cell",
+ "winapi",
+]
+
+[[package]]
+name = "dlopen2_derive"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f2b99bf03862d7f545ebc28ddd33a665b50865f4dfd84031a393823879bd4c54"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.101",
+]
+
 [[package]]
 name = "dlv-list"
 version = "0.5.2"
@@ -2039,6 +2111,18 @@ version = "1.15.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
 
+[[package]]
+name = "embedded-io"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ef1a6892d9eef45c8fa6b9e0086428a2cca8491aca8f787c534a3d6d0bcb3ced"
+
+[[package]]
+name = "embedded-io"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d"
+
 [[package]]
 name = "encoding_rs"
 version = "0.8.35"
@@ -3017,6 +3101,16 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "halfbrown"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "aa2c385c6df70fd180bbb673d93039dbd2cd34e41d782600bdf6e1ca7bce39aa"
+dependencies = [
+ "hashbrown 0.15.3",
+ "serde",
+]
+
 [[package]]
 name = "handlebars"
 version = "4.5.0"
@@ -3031,6 +3125,15 @@ dependencies = [
  "thiserror 1.0.69",
 ]
 
+[[package]]
+name = "hash32"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67"
+dependencies = [
+ "byteorder",
+]
+
 [[package]]
 name = "hashbrown"
 version = "0.12.3"
@@ -3057,6 +3160,15 @@ dependencies = [
  "foldhash",
 ]
 
+[[package]]
+name = "hashlink"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
+dependencies = [
+ "hashbrown 0.15.3",
+]
+
 [[package]]
 name = "hdrhistogram"
 version = "7.5.4"
@@ -3070,6 +3182,20 @@ dependencies = [
  "num-traits",
 ]
 
+[[package]]
+name = "heapless"
+version = "0.7.17"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f"
+dependencies = [
+ "atomic-polyfill",
+ "hash32",
+ "rustc_version",
+ "serde",
+ "spin",
+ "stable_deref_trait",
+]
+
 [[package]]
 name = "heck"
 version = "0.5.0"
@@ -3545,6 +3671,107 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "iggy_connector_data_producer"
+version = "0.1.0"
+dependencies = [
+ "chrono",
+ "iggy",
+ "rand 0.9.1",
+ "serde",
+ "serde_json",
+ "thiserror 2.0.12",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+]
+
+[[package]]
+name = "iggy_connector_quickwit_sink"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "dashmap",
+ "iggy_connector_sdk",
+ "once_cell",
+ "reqwest",
+ "serde",
+ "serde_yml",
+ "simd-json",
+ "tracing",
+]
+
+[[package]]
+name = "iggy_connector_runtime"
+version = "0.1.0"
+dependencies = [
+ "config",
+ "dashmap",
+ "dlopen2",
+ "flume",
+ "futures",
+ "iggy",
+ "iggy_connector_sdk",
+ "mimalloc",
+ "once_cell",
+ "postcard",
+ "serde",
+ "serde_json",
+ "thiserror 2.0.12",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+]
+
+[[package]]
+name = "iggy_connector_sdk"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "chrono",
+ "dashmap",
+ "iggy",
+ "once_cell",
+ "postcard",
+ "serde",
+ "serde_json",
+ "simd-json",
+ "strum_macros",
+ "thiserror 2.0.12",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+ "uuid",
+]
+
+[[package]]
+name = "iggy_connector_stdout_sink"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "dashmap",
+ "iggy_connector_sdk",
+ "once_cell",
+ "serde",
+ "tracing",
+]
+
+[[package]]
+name = "iggy_connector_test_source"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "dashmap",
+ "humantime",
+ "iggy_connector_sdk",
+ "once_cell",
+ "rand 0.9.1",
+ "serde",
+ "simd-json",
+ "tokio",
+ "tracing",
+]
+
 [[package]]
 name = "iggy_examples"
 version = "0.0.5"
@@ -3810,6 +4037,17 @@ dependencies = [
  "wasm-bindgen",
 ]
 
+[[package]]
+name = "json5"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1"
+dependencies = [
+ "pest",
+ "pest_derive",
+ "serde",
+]
+
 [[package]]
 name = "jsonwebtoken"
 version = "9.3.1"
@@ -3984,6 +4222,16 @@ dependencies = [
  "vcpkg",
 ]
 
+[[package]]
+name = "libyml"
+version = "0.0.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3302702afa434ffa30847a83305f0a69d6abd74293b6554c18ec85c7ef30c980"
+dependencies = [
+ "anyhow",
+ "version_check",
+]
+
 [[package]]
 name = "libz-sys"
 version = "1.1.22"
@@ -4816,6 +5064,12 @@ version = "1.0.15"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
 
+[[package]]
+name = "pathdiff"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3"
+
 [[package]]
 name = "pbkdf2"
 version = "0.12.2"
@@ -4992,6 +5246,19 @@ dependencies = [
  "portable-atomic",
 ]
 
+[[package]]
+name = "postcard"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "170a2601f67cc9dba8edd8c4870b15f71a6a2dc196daec8c83f72b59dff628a8"
+dependencies = [
+ "cobs",
+ "embedded-io 0.4.0",
+ "embedded-io 0.6.1",
+ "heapless",
+ "serde",
+]
+
 [[package]]
 name = "potential_utf"
 version = "0.1.2"
@@ -5446,6 +5713,26 @@ dependencies = [
  "thiserror 2.0.12",
 ]
 
+[[package]]
+name = "ref-cast"
+version = "1.0.24"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4a0ae411dbe946a674d89546582cea4ba2bb8defac896622d6496f14c23ba5cf"
+dependencies = [
+ "ref-cast-impl",
+]
+
+[[package]]
+name = "ref-cast-impl"
+version = "1.0.24"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1165225c21bff1f3bbce98f5a1f889949bc902d3575308cc7b0de30b4f6d27c7"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.101",
+]
+
 [[package]]
 name = "regex"
 version = "1.11.1"
@@ -5638,6 +5925,18 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "ron"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94"
+dependencies = [
+ "base64 0.21.7",
+ "bitflags 2.9.0",
+ "serde",
+ "serde_derive",
+]
+
 [[package]]
 name = "route-recognizer"
 version = "0.3.1"
@@ -6105,6 +6404,21 @@ dependencies = [
  "syn 2.0.101",
 ]
 
+[[package]]
+name = "serde_yml"
+version = "0.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "59e2dd588bf1597a252c3b920e0143eb99b0f76e4e082f4c92ce34fbc9e71ddd"
+dependencies = [
+ "indexmap 2.9.0",
+ "itoa",
+ "libyml",
+ "memchr",
+ "ryu",
+ "serde",
+ "version_check",
+]
+
 [[package]]
 name = "serial_test"
 version = "3.2.0"
@@ -6254,6 +6568,21 @@ version = "0.3.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
 
+[[package]]
+name = "simd-json"
+version = "0.15.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c962f626b54771990066e5435ec8331d1462576cd2d1e62f24076ae014f92112"
+dependencies = [
+ "getrandom 0.3.3",
+ "halfbrown",
+ "ref-cast",
+ "serde",
+ "serde_json",
+ "simdutf8",
+ "value-trait",
+]
+
 [[package]]
 name = "simdutf8"
 version = "0.1.5"
@@ -7170,6 +7499,18 @@ version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
 
+[[package]]
+name = "value-trait"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0508fce11ad19e0aab49ce20b6bec7f8f82902ded31df1c9fc61b90f0eb396b8"
+dependencies = [
+ "float-cmp",
+ "halfbrown",
+ "itoa",
+ "ryu",
+]
+
 [[package]]
 name = "vcpkg"
 version = "0.2.15"
@@ -8037,6 +8378,17 @@ dependencies = [
  "lzma-sys",
 ]
 
+[[package]]
+name = "yaml-rust2"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "18b783b2c2789414f8bb84ca3318fc9c2d7e7be1c22907d37839a58dedb369d3"
+dependencies = [
+ "arraydeque",
+ "encoding_rs",
+ "hashlink",
+]
+
 [[package]]
 name = "yansi"
 version = "1.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index 97035f30..3a4984f1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,6 +30,12 @@ members = [
     "core/binary_protocol",
     "core/cli",
     "core/common",
+    "core/connectors/data_producer",
+    "core/connectors/runtime",
+    "core/connectors/sdk",
+    "core/connectors/sinks/quickwit_sink",
+    "core/connectors/sinks/stdout_sink",
+    "core/connectors/sources/test_source",
     "core/examples",
     "core/integration",
     "core/sdk",
@@ -68,16 +74,18 @@ byte-unit = { version = "5.1.6", default-features = false, 
features = [
 ] }
 bytes = "1.10.1"
 charming = "0.4.0"
-chrono = "0.4.41"
+chrono = { version = "0.4.41", features = ["serde"] }
 clap = { version = "4.5.37", features = ["derive"] }
+config = { version = "0.15.11" }
 comfy-table = "7.1.4"
 crc32fast = "1.4.2"
 crossbeam = "0.8.4"
 dashmap = "6.1.0"
+derive_builder = "0.20.2"
 derive_more = { version = "2.0.1", features = ["full"] }
 derive-new = "0.7.0"
 dirs = "6.0.0"
-derive_builder = "0.20.2"
+dlopen2 = "0.7.0"
 enum_dispatch = "0.3.13"
 figlet-rs = "0.1.5"
 flume = "0.11.1"
@@ -87,9 +95,11 @@ human-repr = "1.1.0"
 humantime = "2.2.0"
 keyring = { version = "3.6.2", features = ["sync-secret-service", "vendored"] }
 nonzero_lit = "0.1.2"
+once_cell = "1.21.3"
 openssl = { version = "0.10.72", features = ["vendored"] }
 passterm = "=2.0.1"
 quinn = "0.11.8"
+postcard = { version = "1.1.1", features = ["alloc"] }
 rand = "0.9.1"
 reqwest = { version = "0.12.15", default-features = false, features = [
     "json",
@@ -101,7 +111,9 @@ rustls = { version = "0.23.27", features = ["ring"] }
 serde = { version = "1.0.219", features = ["derive", "rc"] }
 serde_json = "1.0.140"
 serde_with = { version = "3.12.0", features = ["base64", "macros"] }
+serde_yml = "0.0.12"
 serial_test = "3.2.0"
+simd-json = { version = "0.15.1", features = ["serde_impl"] }
 sysinfo = "0.35.0"
 tempfile = "3.19.1"
 thiserror = "2.0.12"
@@ -116,6 +128,7 @@ tracing-subscriber = { version = "0.3.19", default-features 
= false, features =
     "ansi",
 ] }
 uuid = { version = "1.16.0", features = [
+    "v4",
     "v7",
     "fast-rng",
     "serde",
@@ -123,6 +136,7 @@ uuid = { version = "1.16.0", features = [
 ] }
 rust-s3 = { version = "0.35.1", features = ["default"] }
 strum = { version = "0.27.1", features = ["derive"] }
+strum_macros = "0.27.1"
 aes-gcm = "0.10.3"
 base64 = "0.22.1"
 twox-hash = { version = "2.1.0", features = ["xxhash32"] }
@@ -151,8 +165,9 @@ mimalloc = "0.1"
 console-subscriber = "0.4.1"
 
 # Path dependencies
-iggy_common = { path = "core/common", version = "0.7.0" }
 iggy_binary_protocol = { path = "core/binary_protocol", version = "0.7.0" }
+iggy_common = { path = "core/common", version = "0.7.0" }
+iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.0" }
 iggy = { path = "core/sdk", version = "0.7.0" }
 server = { path = "core/server" }
 integration = { path = "core/integration" }
diff --git a/core/connectors/README.md b/core/connectors/README.md
new file mode 100644
index 00000000..dc026d52
--- /dev/null
+++ b/core/connectors/README.md
@@ -0,0 +1,35 @@
+# Apache Iggy Connectors
+
+The highly performant and modular runtime for statically typed, yet 
dynamically loaded connectors. Ingest the data from the external sources and 
push it further to the Iggy streams, or fetch the data from the Iggy streams 
and push it further to the external sources. Create your own Rust plugins by 
simply implementing either the `Source` or `Sink` trait and build custom 
pipelines for the data processing.
+
+## Features
+- **High Performance**: Utilizes Rust's performance characteristics to ensure 
fast data ingestion and egress.
+ - **Low memory footprint**: Designed with memory efficiency in mind, 
minimizing the memory footprint of the connectors.
+- **Modular Design**: Designed with modularity in mind, allowing for easy 
extension and customization.
+- **Dynamic Loading**: Supports dynamic loading of plugins, enabling seamless 
integration with various data sources and sinks.
+- **Statically Typed**: Ensures type safety and compile-time checks, reducing 
runtime errors.
+- **Easy Customization**: Provides a simple interface for implementing custom 
connectors, making it easy to create new plugins.
+- **Data transformation**: Supports data transformation with the help of 
existing functions.
+- **Powerful configuration**: Define your sinks, sources, and transformations 
in the configuration file.
+
+## Quick Start
+
+1. Build the project in release mode, and make sure that the plugins specified 
in `core/connectors/config.toml` under `path` are available. BTW you can use 
either of `toml`, `json` or `yaml` formats.
+
+2. Run `docker compose up -d` from `/core/connectors` which will start the 
Quickwit server to be used by an example sink connector.
+
+3. Set environment variable 
`IGGY_CONNECTORS_RUNTIME_CONFIG_PATH=core/connectors/runtime/config` pointing 
to the runtime configuration file.
+
+4. Start the Iggy server and invoke the following commands to create the 
example streams and topics used by the connectors.
+
+```
+iggy --username iggy --password iggy stream create example
+iggy --username iggy --password iggy stream create qw
+iggy --username iggy --password iggy topic create qw records 1 none 1d
+```
+
+5. Execute `cargo r --bin iggy_connector_data_producer -r` which will start 
the example data producer application, sending the messages to previously 
created `qw` stream and `records` topic.
+
+6. Start the connector runtime `cargo r --bin iggy_connector_runtime -r` - you 
should be able to browse Quickwit UI at `http://localhost:7280` with records 
being constantly added to the `events` index (this is part of the Quickwit 
sink). At the same time, you should see the new messages being added to the 
`example` stream and `topic1` topic by the test source connector - you can use 
Iggy Web UI to browse the data.
+
+New connector can be built simply by implementing either `Sink` or `Source` 
trait. Please check the existing examples under `core/connectors/sinks` and 
`core/connectors/sources` directories.
diff --git a/core/connectors/data_producer/Cargo.toml 
b/core/connectors/data_producer/Cargo.toml
new file mode 100644
index 00000000..59945a16
--- /dev/null
+++ b/core/connectors/data_producer/Cargo.toml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_data_producer"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[dependencies]
+chrono = { workspace = true }
+iggy = { workspace = true }
+rand = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+thiserror = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
diff --git a/core/connectors/data_producer/src/main.rs 
b/core/connectors/data_producer/src/main.rs
new file mode 100644
index 00000000..2607d955
--- /dev/null
+++ b/core/connectors/data_producer/src/main.rs
@@ -0,0 +1,146 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::{env, str::FromStr, time::Duration};
+
+use chrono::{DateTime, Days, Utc};
+use iggy::prelude::{
+    Client, IggyClient, IggyClientBuilder, IggyDuration, IggyError, 
IggyMessage, Partitioning,
+};
+use rand::{
+    Rng,
+    distr::{Alphanumeric, Uniform},
+};
+use serde::{Deserialize, Serialize};
+use thiserror::Error;
+use tokio::time::sleep;
+use tracing::info;
+use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, 
util::SubscriberInitExt};
+
+const SOURCES: [&str; 6] = ["browser", "mobile", "desktop", "email", 
"network", "other"];
+const STATES: [&str; 5] = ["active", "inactive", "blocked", "deleted", 
"unknown"];
+const DOMAINS: [&str; 5] = [
+    "gmail.com",
+    "yahoo.com",
+    "hotmail.com",
+    "outlook.com",
+    "aol.com",
+];
+
+#[tokio::main]
+async fn main() -> Result<(), DataProducerError> {
+    Registry::default()
+        .with(tracing_subscriber::fmt::layer())
+        
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
+        .init();
+    info!("Starting data producer...");
+    let address = 
env::var("IGGY_ADDRESS").unwrap_or("localhost:8090".to_owned());
+    let username = env::var("IGGY_USERNAME").unwrap_or("iggy".to_owned());
+    let password = env::var("IGGY_PASSWORD").unwrap_or("iggy".to_owned());
+    let stream = env::var("IGGY_STREAM").unwrap_or("qw".to_owned());
+    let topic = env::var("IGGY_TOPIC").unwrap_or("records".to_owned());
+    let client = create_client(&address, &username, &password).await?;
+    let mut producer = client
+        .producer(&stream, &topic)?
+        .batch_size(1000)
+        .send_interval(IggyDuration::from_str("5ms").unwrap())
+        .partitioning(Partitioning::balanced())
+        .build();
+    producer.init().await?;
+
+    let mut rng = rand::rng();
+    let mut batches_count = 0;
+    while batches_count < 100000 {
+        let records_count = rng.sample(Uniform::new(500u32, 1000).unwrap());
+        let messages = (0..records_count)
+            .map(|_| random_record())
+            .flat_map(|record| serde_json::to_string(&record).ok())
+            .flat_map(|payload| IggyMessage::from_str(&payload).ok())
+            .collect::<Vec<_>>();
+        producer.send(messages).await?;
+        info!("Sent {records_count} messages");
+        sleep(Duration::from_millis(10)).await;
+        batches_count += 1;
+    }
+
+    info!("Reached maximum batches count");
+    Ok(())
+}
+
+async fn create_client(
+    address: &str,
+    username: &str,
+    password: &str,
+) -> Result<IggyClient, IggyError> {
+    let connection_string = format!("iggy://{username}:{password}@{address}");
+    let client = 
IggyClientBuilder::from_connection_string(&connection_string)?.build()?;
+    client.connect().await?;
+    Ok(client)
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct Record {
+    user_id: String,
+    user_type: u8,
+    email: String,
+    source: String,
+    state: String,
+    created_at: DateTime<Utc>,
+    message: String,
+}
+
+fn random_record() -> Record {
+    let mut rng = rand::rng();
+    let source =
+        SOURCES[rng.sample(Uniform::new(0u8, SOURCES.len() as u8).unwrap()) as 
usize].to_owned();
+    let state =
+        STATES[rng.sample(Uniform::new(0u8, STATES.len() as u8).unwrap()) as 
usize].to_owned();
+    let email = format!(
+        "{}@{}",
+        random_string(rng.sample(Uniform::new(3u32, 20).unwrap()) as usize),
+        DOMAINS[rng.sample(Uniform::new(0u8, DOMAINS.len() as u8).unwrap()) as 
usize]
+    );
+    let created_at = Utc::now()
+        .checked_sub_days(Days::new(rng.sample(Uniform::new(0u64, 
1000).unwrap())))
+        .unwrap();
+    Record {
+        user_id: format!("user_{}", rng.sample(Uniform::new(1u32, 
100).unwrap())),
+        user_type: rng.sample(Uniform::new(1u8, 5).unwrap()),
+        email,
+        source,
+        state,
+        message: random_string(rng.sample(Uniform::new(10u32, 100).unwrap()) 
as usize),
+        created_at,
+    }
+}
+
+fn random_string(size: usize) -> String {
+    let mut rng = rand::rng();
+    let text: String = (0..size)
+        .map(|_| rng.sample(Alphanumeric) as char)
+        .collect();
+    text
+}
+
+#[derive(Debug, Error)]
+enum DataProducerError {
+    #[error("Iggy client error")]
+    IggyClient(#[from] iggy::prelude::ClientError),
+    #[error("Iggy error")]
+    IggyError(#[from] iggy::prelude::IggyError),
+}
diff --git a/core/connectors/docker-compose.yml 
b/core/connectors/docker-compose.yml
new file mode 100644
index 00000000..5657e42e
--- /dev/null
+++ b/core/connectors/docker-compose.yml
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+services:
+  quickwit:
+    image: quickwit/quickwit:edge
+    container_name: quickwit
+    restart: unless-stopped
+    volumes:
+      - quickwit:/quickwit/qwdata
+    ports:
+      - 7280:7280
+      - 7281:7281
+    command: run
+    environment:
+      - QW_ENABLE_OTLP_ENDPOINT=true
+      - QW_ENABLE_JAEGER_ENDPOINT=true
+      - QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER=true
+      - QW_ENABLE_INGEST_V2=true
+      - OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:7281
+      - 
RUST_LOG=info,quickwit_actors=error,indexing_split_store=error,tantivy=error,quickwit_serve=warn,quickwit_indexing=warn,quickwit=warn
+
+volumes:
+  quickwit:
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
new file mode 100644
index 00000000..a975d0ae
--- /dev/null
+++ b/core/connectors/runtime/Cargo.toml
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_runtime"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[dependencies]
+config = { workspace = true }
+dashmap = { workspace = true }
+dlopen2 = { workspace = true }
+flume = { workspace = true }
+futures = { workspace = true }
+iggy = { workspace = true }
+iggy_connector_sdk = { workspace = true }
+mimalloc = { workspace = true }
+once_cell = { workspace = true }
+postcard = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+thiserror = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
diff --git a/core/connectors/runtime/config.toml 
b/core/connectors/runtime/config.toml
new file mode 100644
index 00000000..618120dd
--- /dev/null
+++ b/core/connectors/runtime/config.toml
@@ -0,0 +1,131 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[iggy]
+address = "localhost:8090"
+username = "iggy"
+password = "iggy"
+token = "secret"
+
+[sources.test1]
+enabled = true
+name = "Test source"
+path = "target/release/libiggy_connector_test_source"
+
+[[sources.test1.streams]]
+stream = "example"
+topic = "topic1"
+schema = "json"
+batch_size = 1000
+send_interval = "5ms"
+
+[sources.test1.config]
+interval = "100ms"
+# max_count = 1000
+messages_range = [10, 50]
+payload_size = 200
+
+[sources.test1.transforms.add_fields]
+enabled = true
+
+[[sources.test1.transforms.add_fields.fields]]
+key = "test_field"
+value.static = "hello!"
+
+[sinks.quickwit]
+enabled = true
+name = "Quickwit sink 1"
+path = "target/release/libiggy_connector_quickwit_sink"
+
+[[sinks.quickwit.streams]]
+stream = "qw"
+topics = ["records"]
+schema = "json"
+batch_size = 1000
+poll_interval = "5ms"
+consumer_group = "qw_sink_connector"
+
+[sinks.quickwit.transforms.add_fields]
+enabled = true
+
+[[sinks.quickwit.transforms.add_fields.fields]]
+key = "service_name"
+value.static = "qw_connector"
+
+[[sinks.quickwit.transforms.add_fields.fields]]
+key = "timestamp"
+value.computed = "timestamp_millis"
+
+[[sinks.quickwit.ransforms.add_fields.fields]]
+key = "random_id"
+value.computed = "uuid_v7"
+
+[sinks.quickwit.transforms.delete_fields]
+enabled = true
+fields = ["email", "created_at"]
+
+[sinks.quickwit.config]
+url = "http://localhost:7280";
+index = """
+version: 0.9
+
+index_id: events
+
+doc_mapping:
+  mode: strict
+  field_mappings:
+    - name: timestamp
+      type: datetime
+      input_formats: [unix_timestamp]
+      output_format: unix_timestamp_nanos
+      indexed: false
+      fast: true
+      fast_precision: milliseconds
+    - name: service_name
+      type: text
+      tokenizer: raw
+      fast: true
+    - name: random_id
+      type: text
+      tokenizer: raw
+      fast: true
+    - name: user_id
+      type: text
+      tokenizer: raw
+      fast: true
+    - name: user_type
+      type: u64
+      fast: true
+    - name: source
+      type: text
+      tokenizer: default
+    - name: state
+      type: text
+      tokenizer: default
+    - name: message
+      type: text
+      tokenizer: default
+
+  timestamp_field: timestamp
+
+indexing_settings:
+  commit_timeout_secs: 10
+
+retention:
+  period: 7 days
+  schedule: daily
+"""
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
new file mode 100644
index 00000000..d7f7e664
--- /dev/null
+++ b/core/connectors/runtime/src/main.rs
@@ -0,0 +1,355 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use config::{Config, Environment, File};
+use dlopen2::wrapper::{Container, WrapperApi};
+use iggy::prelude::{Client, IggyClient, IggyClientBuilder, IggyConsumer, 
IggyError, IggyProducer};
+use iggy_connector_sdk::{
+    Schema, StreamDecoder, StreamEncoder,
+    sink::ConsumeCallback,
+    source::{HandleCallback, SendCallback},
+    transforms::{Transform, TransformType},
+};
+use mimalloc::MiMalloc;
+use serde::{Deserialize, Serialize};
+use std::{
+    collections::HashMap,
+    env,
+    sync::{Arc, atomic::AtomicU32},
+};
+use thiserror::Error;
+use tracing::{debug, error, info};
+use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, 
util::SubscriberInitExt};
+
+mod sink;
+mod source;
+mod transform;
+
+#[global_allocator]
+static GLOBAL: MiMalloc = MiMalloc;
+
+static PLUGIN_ID: AtomicU32 = AtomicU32::new(1);
+
+const ALLOWED_PLUGIN_EXTENSIONS: [&str; 3] = ["so", "dylib", "dll"];
+
+#[tokio::main]
+async fn main() -> Result<(), RuntimeError> {
+    Registry::default()
+        .with(tracing_subscriber::fmt::layer())
+        
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
+        .init();
+    let config_path =
+        env::var("IGGY_CONNECTORS_RUNTIME_CONFIG_PATH").unwrap_or_else(|_| 
"config".to_string());
+    info!("Starting Iggy Connector Runtime, loading configuration from: 
{config_path}...");
+    let builder = Config::builder()
+        .add_source(File::with_name(&config_path))
+        .add_source(Environment::with_prefix("IGGY").separator("_"));
+
+    let config: RuntimeConfig = builder
+        .build()
+        .expect("Failed to build config")
+        .try_deserialize()
+        .expect("Failed to deserialize config");
+
+    let iggy_address = config.iggy.address;
+    let iggy_username = config.iggy.username.expect("Iggy username must be 
set");
+    let iggy_password = config.iggy.password.expect("Iggy password must be 
set");
+    let consumer_client = create_client(&iggy_address, &iggy_username, 
&iggy_password).await?;
+    consumer_client.connect().await?;
+    let producer_client = create_client(&iggy_address, &iggy_username, 
&iggy_password).await?;
+    producer_client.connect().await?;
+
+    let sources = source::init(config.sources, &producer_client).await?;
+    let sinks = sink::init(config.sinks, &consumer_client).await?;
+
+    let mut sink_wrappers = vec![];
+    let mut sink_with_plugins = HashMap::new();
+    for (key, sink) in sinks {
+        let plugin_ids = sink.plugins.iter().map(|plugin| plugin.id).collect();
+        sink_wrappers.push(SinkConnectorWrapper {
+            callback: sink.container.consume,
+            plugins: sink.plugins,
+        });
+        sink_with_plugins.insert(
+            key,
+            SinkWithPlugins {
+                container: sink.container,
+                plugin_ids,
+            },
+        );
+    }
+
+    let mut source_wrappers = vec![];
+    let mut source_with_plugins = HashMap::new();
+    for (key, source) in sources {
+        let plugin_ids = source.plugins.iter().map(|plugin| 
plugin.id).collect();
+        source_wrappers.push(SourceConnectorWrapper {
+            callback: source.container.handle,
+            plugins: source.plugins,
+        });
+        source_with_plugins.insert(
+            key,
+            SourceWithPlugins {
+                container: source.container,
+                plugin_ids,
+            },
+        );
+    }
+
+    source::handle(source_wrappers);
+    sink::consume(sink_wrappers);
+    info!("All sources and sinks spawned.");
+
+    #[cfg(unix)]
+    let (mut ctrl_c, mut sigterm) = {
+        use tokio::signal::unix::{SignalKind, signal};
+        (
+            signal(SignalKind::interrupt()).expect("Failed to create SIGINT 
signal"),
+            signal(SignalKind::terminate()).expect("Failed to create SIGTERM 
signal"),
+        )
+    };
+
+    #[cfg(unix)]
+    tokio::select! {
+        _ = ctrl_c.recv() => {
+            info!("Received SIGINT. Shutting down connectors...");
+        },
+        _ = sigterm.recv() => {
+            info!("Received SIGTERM. Shutting down connectors...");
+        }
+    }
+
+    for (key, source) in source_with_plugins {
+        for plugin_id in source.plugin_ids {
+            info!("Closing source connector with ID: {plugin_id} for plugin: 
{key}");
+            source.container.close(plugin_id);
+            info!("Closed source connector with ID: {plugin_id} for plugin: 
{key}");
+        }
+    }
+
+    for (key, sink) in sink_with_plugins {
+        for plugin_id in sink.plugin_ids {
+            info!("Closing sink connector with ID: {plugin_id} for plugin: 
{key}",);
+            sink.container.close(plugin_id);
+            info!("Closed sink connector with ID: {plugin_id} for plugin: 
{key}");
+        }
+    }
+
+    producer_client.shutdown().await?;
+    consumer_client.shutdown().await?;
+
+    info!("All connectors closed.");
+    Ok(())
+}
+
+pub fn resolve_plugin_path(path: &str) -> String {
+    let extension = path.split('.').next_back().unwrap_or_default();
+    if ALLOWED_PLUGIN_EXTENSIONS.contains(&extension) {
+        path.to_string()
+    } else {
+        let os = std::env::consts::OS;
+        let os_extension = match os {
+            "windows" => "dll",
+            "macos" => "dylib",
+            _ => "so",
+        };
+
+        debug!("Resolved plugin path: {path}.{os_extension} for detected OS: 
{os}");
+        format!("{path}.{os_extension}")
+    }
+}
+
+async fn create_client(
+    address: &str,
+    username: &str,
+    password: &str,
+) -> Result<IggyClient, IggyError> {
+    let connection_string = format!("iggy://{username}:{password}@{address}");
+    let client = 
IggyClientBuilder::from_connection_string(&connection_string)?.build()?;
+    client.connect().await?;
+    Ok(client)
+}
+
+#[derive(WrapperApi)]
+struct SourceApi {
+    open: extern "C" fn(id: u32, config_ptr: *const u8, config_len: usize) -> 
i32,
+    handle: extern "C" fn(id: u32, callback: SendCallback) -> i32,
+    close: extern "C" fn(id: u32) -> i32,
+}
+
+#[derive(WrapperApi)]
+pub struct SinkApi {
+    open: extern "C" fn(id: u32, config_ptr: *const u8, config_len: usize) -> 
i32,
+    #[allow(clippy::too_many_arguments)]
+    consume: extern "C" fn(
+        id: u32,
+        topic_meta_ptr: *const u8,
+        topic_meta_len: usize,
+        messages_meta_ptr: *const u8,
+        messages_meta_len: usize,
+        messages_ptr: *const u8,
+        messages_len: usize,
+    ) -> i32,
+    close: extern "C" fn(id: u32) -> i32,
+}
+
+struct SinkConnector {
+    container: Container<SinkApi>,
+    plugins: Vec<SinkConnectorPlugin>,
+}
+
+struct SinkConnectorPlugin {
+    id: u32,
+    consumers: Vec<SinkConnectorConsumer>,
+}
+
+struct SinkConnectorConsumer {
+    batch_size: u32,
+    consumer: IggyConsumer,
+    decoder: Arc<dyn StreamDecoder>,
+    transforms: Vec<Arc<dyn Transform>>,
+}
+
+struct SinkConnectorWrapper {
+    callback: ConsumeCallback,
+    plugins: Vec<SinkConnectorPlugin>,
+}
+
+struct SinkWithPlugins {
+    container: Container<SinkApi>,
+    plugin_ids: Vec<u32>,
+}
+
+struct SourceConnector {
+    container: Container<SourceApi>,
+    plugins: Vec<SourceConnectorPlugin>,
+}
+
+struct SourceConnectorPlugin {
+    id: u32,
+    transforms: Vec<Arc<dyn Transform>>,
+    producer: Option<SourceConnectorProducer>,
+}
+
+struct SourceConnectorProducer {
+    encoder: Arc<dyn StreamEncoder>,
+    producer: IggyProducer,
+}
+
+struct SourceWithPlugins {
+    container: Container<SourceApi>,
+    plugin_ids: Vec<u32>,
+}
+
+struct SourceConnectorWrapper {
+    callback: HandleCallback,
+    plugins: Vec<SourceConnectorPlugin>,
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+struct RuntimeConfig {
+    iggy: IggyConfig,
+    sinks: HashMap<String, SinkConfig>,
+    sources: HashMap<String, SourceConfig>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct IggyConfig {
+    address: String,
+    username: Option<String>,
+    password: Option<String>,
+    token: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct SinkConfig {
+    enabled: bool,
+    name: String,
+    path: String,
+    transforms: Option<TransformsConfig>,
+    streams: Vec<StreamConsumerConfig>,
+    config: Option<serde_json::Value>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct StreamConsumerConfig {
+    stream: String,
+    topics: Vec<String>,
+    schema: Schema,
+    batch_size: Option<u32>,
+    poll_interval: Option<String>,
+    consumer_group: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct StreamProducerConfig {
+    stream: String,
+    topic: String,
+    schema: Schema,
+    batch_size: Option<u32>,
+    send_interval: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct SourceConfig {
+    enabled: bool,
+    name: String,
+    path: String,
+    transforms: Option<TransformsConfig>,
+    streams: Vec<StreamProducerConfig>,
+    config: Option<serde_json::Value>,
+}
+
+#[derive(Debug, Default, Serialize, Deserialize)]
+struct TransformsConfig {
+    #[serde(flatten)]
+    transforms: HashMap<TransformType, serde_json::Value>,
+}
+
+#[derive(Debug, Default, Serialize, Deserialize)]
+struct SharedTransformConfig {
+    enabled: bool,
+}
+
+#[derive(Debug, Error)]
+pub enum RuntimeError {
+    #[error("Invalid config")]
+    InvalidConfig,
+    #[error("Invalid record")]
+    InvalidRecord,
+    #[error("Failed to serialize topic metadata")]
+    FailedToSerializeTopicMetadata,
+    #[error("Failed to serialize messages metadata")]
+    FailedToSerializeMessagesMetadata,
+    #[error("Failed to serialize raw messages")]
+    FailedToSerializeRawMessages,
+    #[error("Failed to serialize headers")]
+    FailedToSerializeHeaders,
+    #[error("Invalid transformer")]
+    InvalidTransformer,
+    #[error("Invalid transform")]
+    InvalidTransform,
+    #[error("Invalid sink")]
+    InvalidSink,
+    #[error("Connector SDK error")]
+    ConnectorSdkError(#[from] iggy_connector_sdk::Error),
+    #[error("Iggy client error")]
+    IggyClient(#[from] iggy::prelude::ClientError),
+    #[error("Iggy error")]
+    IggyError(#[from] iggy::prelude::IggyError),
+}
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
new file mode 100644
index 00000000..121a39f4
--- /dev/null
+++ b/core/connectors/runtime/src/sink.rs
@@ -0,0 +1,359 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dlopen2::wrapper::Container;
+use futures::StreamExt;
+use iggy::prelude::{
+    AutoCommit, AutoCommitWhen, IggyClient, IggyConsumer, IggyDuration, 
IggyMessage,
+    PollingStrategy,
+};
+use iggy_connector_sdk::{
+    DecodedMessage, MessagesMetadata, RawMessage, RawMessages, 
ReceivedMessage, StreamDecoder,
+    TopicMetadata, sink::ConsumeCallback, transforms::Transform,
+};
+use std::{
+    collections::HashMap,
+    str::FromStr,
+    sync::{Arc, atomic::Ordering},
+    time::Instant,
+};
+use tracing::{error, info, warn};
+
+use crate::{
+    PLUGIN_ID, RuntimeError, SinkApi, SinkConfig, SinkConnector, 
SinkConnectorConsumer,
+    SinkConnectorPlugin, SinkConnectorWrapper, resolve_plugin_path, transform,
+};
+
+pub async fn init(
+    sink_configs: HashMap<String, SinkConfig>,
+    iggy_client: &IggyClient,
+) -> Result<HashMap<String, SinkConnector>, RuntimeError> {
+    let mut sink_connectors: HashMap<String, SinkConnector> = HashMap::new();
+    for (key, config) in sink_configs {
+        let name = config.name;
+        if !config.enabled {
+            warn!("Sink: {name} is disabled ({key})");
+            continue;
+        }
+
+        let plugin_id = PLUGIN_ID.load(Ordering::Relaxed);
+        let path = resolve_plugin_path(&config.path);
+        info!("Initializing sink container with name: {name} ({key}), plugin: 
{path}",);
+        if let Some(container) = sink_connectors.get_mut(&path) {
+            info!("Sink container for plugin: {path} is already loaded.",);
+            init_sink(
+                &container.container,
+                &config.config.unwrap_or_default(),
+                plugin_id,
+            );
+            container.plugins.push(SinkConnectorPlugin {
+                id: plugin_id,
+                consumers: vec![],
+            });
+        } else {
+            let container: Container<SinkApi> =
+                unsafe { Container::load(&path).expect("Failed to load sink 
container") };
+            info!("Sink container for plugin: {path} loaded successfully.",);
+            init_sink(&container, &config.config.unwrap_or_default(), 
plugin_id);
+            sink_connectors.insert(
+                path.to_owned(),
+                SinkConnector {
+                    container,
+                    plugins: vec![SinkConnectorPlugin {
+                        id: plugin_id,
+                        consumers: vec![],
+                    }],
+                },
+            );
+        }
+
+        info!(
+            "Sink container with name: {name} ({key}), initialized 
successfully with ID: {plugin_id}."
+        );
+        PLUGIN_ID.fetch_add(1, Ordering::Relaxed);
+
+        let transforms = if let Some(transforms_config) = config.transforms {
+            let transforms = transform::load(transforms_config).expect("Failed 
to load transforms");
+            let types = transforms
+                .iter()
+                .map(|t| t.r#type().into())
+                .collect::<Vec<&'static str>>()
+                .join(", ");
+            info!("Enabled transforms for sink: {name} ({key}): {types}",);
+            transforms
+        } else {
+            vec![]
+        };
+
+        let connector = sink_connectors
+            .get_mut(&path)
+            .expect("Failed to get sink connector");
+        let plugin = connector
+            .plugins
+            .iter_mut()
+            .find(|p| p.id == plugin_id)
+            .expect("Failed to get sink plugin");
+
+        for stream in config.streams {
+            let poll_interval =
+                
IggyDuration::from_str(&stream.poll_interval.unwrap_or("5ms".to_owned()))
+                    .expect("Invalid poll interval");
+            let consumer_group = stream
+                .consumer_group
+                .unwrap_or(format!("iggy-connect-{key}"));
+            let batch_size = stream.batch_size.unwrap_or(1000);
+            for topic in stream.topics {
+                let mut consumer = iggy_client
+                    .consumer_group(&consumer_group, &stream.stream, &topic)?
+                    
.auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+                    .create_consumer_group_if_not_exists()
+                    .auto_join_consumer_group()
+                    .polling_strategy(PollingStrategy::next())
+                    .poll_interval(poll_interval)
+                    .batch_size(batch_size)
+                    .build();
+
+                consumer.init().await?;
+                plugin.consumers.push(SinkConnectorConsumer {
+                    consumer,
+                    decoder: stream.schema.decoder(),
+                    batch_size,
+                    transforms: transforms.clone(),
+                });
+            }
+        }
+    }
+
+    Ok(sink_connectors)
+}
+
+pub fn consume(sinks: Vec<SinkConnectorWrapper>) {
+    for sink in sinks {
+        for plugin in sink.plugins {
+            info!("Starting consume for sink with ID: {}...", plugin.id);
+            for consumer in plugin.consumers {
+                tokio::spawn(async move {
+                    if let Err(error) = consume_messages(
+                        plugin.id,
+                        consumer.decoder,
+                        consumer.batch_size,
+                        sink.callback,
+                        consumer.transforms,
+                        consumer.consumer,
+                    )
+                    .await
+                    {
+                        error!(
+                            "Failed to consume messages for sink connector 
with ID: {}. {error}",
+                            plugin.id
+                        );
+                        return;
+                    }
+                    info!(
+                        "Consume messages for sink connector with ID: {} 
started successfully.",
+                        plugin.id
+                    );
+                });
+            }
+        }
+    }
+}
+
+async fn consume_messages(
+    plugin_id: u32,
+    decoder: Arc<dyn StreamDecoder>,
+    batch_size: u32,
+    consume: ConsumeCallback,
+    transforms: Vec<Arc<dyn Transform>>,
+    mut consumer: IggyConsumer,
+) -> Result<(), RuntimeError> {
+    info!("Started consuming messages for sink connector with ID: 
{plugin_id}");
+    let batch_size = batch_size as usize;
+    let mut batch = Vec::with_capacity(batch_size);
+    let topic_metadata = TopicMetadata {
+        stream: consumer.stream().to_string(),
+        topic: consumer.topic().to_string(),
+    };
+
+    while let Some(message) = consumer.next().await {
+        let Ok(message) = message else {
+            error!("Failed to receive message.");
+            continue;
+        };
+
+        let partition_id = message.partition_id;
+        let current_offset = message.current_offset;
+        let message_offset = message.message.header.offset;
+        batch.push(message.message);
+        if current_offset != message_offset && batch.len() < batch_size {
+            continue;
+        }
+
+        let messages = std::mem::take(&mut batch);
+        let messages_count = messages.len();
+        let messages_metadata = MessagesMetadata {
+            partition_id,
+            current_offset,
+            schema: decoder.schema(),
+        };
+        info!(
+            "Processing {messages_count} messages for sink connector with ID: 
{}",
+            plugin_id
+        );
+        let start = Instant::now();
+        if let Err(error) = process_messages(
+            plugin_id,
+            messages_metadata,
+            &topic_metadata,
+            messages,
+            &consume,
+            &transforms,
+            &decoder,
+        )
+        .await
+        {
+            error!(
+                "Failed to process {messages_count} messages for sink 
connector with ID: {plugin_id}. {error}",
+            );
+            continue;
+        }
+
+        let elapsed = start.elapsed();
+        info!(
+            "Consumed {messages_count} messages in {:#?} for sink connector 
with ID: {plugin_id}",
+            elapsed
+        );
+    }
+    info!("Stopped consuming messages for sink connector with ID: 
{plugin_id}");
+    Ok(())
+}
+
+fn init_sink(container: &Container<SinkApi>, config: &serde_json::Value, id: 
u32) {
+    let config = serde_json::to_string(config).expect("Invalid sink config.");
+    (container.open)(id, config.as_ptr(), config.len());
+}
+
+async fn process_messages(
+    plugin_id: u32,
+    messages_metadata: MessagesMetadata,
+    topic_metadata: &TopicMetadata,
+    messages: Vec<IggyMessage>,
+    consume: &ConsumeCallback,
+    transforms: &Vec<Arc<dyn Transform>>,
+    decoder: &Arc<dyn StreamDecoder>,
+) -> Result<(), RuntimeError> {
+    let messages = messages.into_iter().map(|message| ReceivedMessage {
+        id: message.header.id,
+        offset: message.header.offset,
+        headers: message.user_headers_map().unwrap_or_default(),
+        payload: message.payload.into(),
+    });
+
+    let count = messages.len();
+    let decoded_messages = messages.into_iter().flat_map(|message| {
+        let Ok(payload) = decoder.decode(message.payload) else {
+            return None;
+        };
+
+        Some(DecodedMessage {
+            id: Some(message.id),
+            offset: Some(message.offset),
+            headers: message.headers,
+            payload,
+        })
+    });
+    let mut messages = Vec::with_capacity(count);
+    for message in decoded_messages {
+        let mut current_message = Some(message);
+        for transform in transforms.iter() {
+            let Some(message) = current_message else {
+                break;
+            };
+
+            current_message = transform.transform(topic_metadata, message)?;
+        }
+
+        // The transform may return no message based on some conditions
+        let Some(message) = current_message else {
+            continue;
+        };
+
+        let Some(offset) = message.offset else {
+            error!(
+                "Offset should be present. Failed to process message for sink 
connector with ID: {plugin_id}"
+            );
+            continue;
+        };
+
+        let Ok(payload) = message.payload.try_into_vec() else {
+            error!("Failed to get message payload for sink connector with ID: 
{plugin_id}");
+            continue;
+        };
+
+        let headers: Result<Vec<u8>, RuntimeError> = if let Some(headers) = 
message.headers {
+            Ok(postcard::to_allocvec(&headers).map_err(|error| {
+                error!("Failed to serialize headers. {error}");
+                RuntimeError::FailedToSerializeHeaders
+            })?)
+        } else {
+            Ok(vec![])
+        };
+
+        let Ok(headers) = headers else {
+            error!("Failed to serialize message headers for sink connector 
with ID: {plugin_id}");
+            continue;
+        };
+
+        messages.push(RawMessage {
+            offset,
+            headers,
+            payload,
+        });
+    }
+
+    let topic_meta = postcard::to_allocvec(topic_metadata).map_err(|error| {
+        error!("Failed to serialize topic metadata. {error}");
+        RuntimeError::FailedToSerializeTopicMetadata
+    })?;
+
+    let messages_meta = 
postcard::to_allocvec(&messages_metadata).map_err(|error| {
+        error!("Failed to serialize messages metadata. {error}");
+        RuntimeError::FailedToSerializeMessagesMetadata
+    })?;
+
+    let messages = postcard::to_allocvec(&RawMessages {
+        schema: decoder.schema(),
+        messages,
+    })
+    .map_err(|error| {
+        error!("Failed to serialize messages for sink connector with ID: 
{plugin_id}. {error}");
+        RuntimeError::FailedToSerializeRawMessages
+    })?;
+
+    (consume)(
+        plugin_id,
+        topic_meta.as_ptr(),
+        topic_meta.len(),
+        messages_meta.as_ptr(),
+        messages_meta.len(),
+        messages.as_ptr(),
+        messages.len(),
+    );
+
+    Ok(())
+}
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
new file mode 100644
index 00000000..b9619827
--- /dev/null
+++ b/core/connectors/runtime/src/source.rs
@@ -0,0 +1,301 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dashmap::DashMap;
+use dlopen2::wrapper::Container;
+use flume::{Receiver, Sender};
+use iggy::prelude::{HeaderKey, HeaderValue, IggyClient, IggyDuration, 
IggyError, IggyMessage};
+use iggy_connector_sdk::{
+    DecodedMessage, Error, ProducedMessages, StreamEncoder, TopicMetadata, 
transforms::Transform,
+};
+use once_cell::sync::Lazy;
+use std::{
+    collections::HashMap,
+    str::FromStr,
+    sync::{Arc, atomic::Ordering},
+};
+use tracing::{debug, error, info, warn};
+
+use crate::{
+    PLUGIN_ID, RuntimeError, SourceApi, SourceConfig, SourceConnector, 
SourceConnectorPlugin,
+    SourceConnectorProducer, SourceConnectorWrapper, resolve_plugin_path, 
transform,
+};
+
+pub static SOURCE_SENDERS: Lazy<DashMap<u32, Sender<ProducedMessages>>> = 
Lazy::new(DashMap::new);
+
+pub async fn init(
+    source_configs: HashMap<String, SourceConfig>,
+    iggy_client: &IggyClient,
+) -> Result<HashMap<String, SourceConnector>, RuntimeError> {
+    let mut source_connectors: HashMap<String, SourceConnector> = 
HashMap::new();
+    for (key, config) in source_configs {
+        let name = config.name;
+        if !config.enabled {
+            warn!("Source: {name} is disabled ({key})");
+            continue;
+        }
+
+        let plugin_id = PLUGIN_ID.load(Ordering::Relaxed);
+        let path = resolve_plugin_path(&config.path);
+        info!("Initializing source container with name: {name} ({key}), 
plugin: {path}",);
+        if let Some(container) = source_connectors.get_mut(&path) {
+            info!("Source container for plugin: {path} is already loaded.",);
+            init_source(
+                &container.container,
+                &config.config.unwrap_or_default(),
+                plugin_id,
+            );
+            container.plugins.push(SourceConnectorPlugin {
+                id: plugin_id,
+                producer: None,
+                transforms: vec![],
+            });
+        } else {
+            let container: Container<SourceApi> =
+                unsafe { Container::load(&path).expect("Failed to load source 
container") };
+            info!("Source container for plugin: {path} loaded successfully.",);
+            init_source(&container, &config.config.unwrap_or_default(), 
plugin_id);
+            source_connectors.insert(
+                path.to_owned(),
+                SourceConnector {
+                    container,
+                    plugins: vec![SourceConnectorPlugin {
+                        id: plugin_id,
+                        producer: None,
+                        transforms: vec![],
+                    }],
+                },
+            );
+        }
+
+        info!(
+            "Source container with name: {name} ({key}), initialized 
successfully with ID: {plugin_id}."
+        );
+        PLUGIN_ID.fetch_add(1, Ordering::Relaxed);
+
+        let transforms = if let Some(transforms_config) = config.transforms {
+            let transforms = transform::load(transforms_config).expect("Failed 
to load transforms");
+            let types = transforms
+                .iter()
+                .map(|t| t.r#type().into())
+                .collect::<Vec<&'static str>>()
+                .join(", ");
+            info!("Enabled transforms for source: {name} ({key}): {types}",);
+            transforms
+        } else {
+            vec![]
+        };
+
+        let connector = source_connectors
+            .get_mut(&path)
+            .expect("Failed to get source connector");
+        let plugin = connector
+            .plugins
+            .iter_mut()
+            .find(|p| p.id == plugin_id)
+            .expect("Failed to get source plugin");
+
+        for stream in config.streams {
+            let send_interval =
+                
IggyDuration::from_str(&stream.send_interval.unwrap_or("5ms".to_owned()))
+                    .expect("Invalid send interval");
+            let batch_size = stream.batch_size.unwrap_or(1000);
+            let mut producer = iggy_client
+                .producer(&stream.stream, &stream.topic)?
+                .send_interval(send_interval)
+                .batch_size(batch_size)
+                .build();
+
+            producer.init().await?;
+            plugin.producer = Some(SourceConnectorProducer {
+                producer,
+                encoder: stream.schema.encoder(),
+            });
+            plugin.transforms = transforms.clone();
+        }
+    }
+
+    Ok(source_connectors)
+}
+
+fn init_source(container: &Container<SourceApi>, config: &serde_json::Value, 
id: u32) {
+    let config = serde_json::to_string(config).expect("Invalid source 
config.");
+    (container.open)(id, config.as_ptr(), config.len());
+}
+
+pub fn handle(sources: Vec<SourceConnectorWrapper>) {
+    for source in sources {
+        for plugin in source.plugins {
+            let plugin_id = plugin.id;
+            info!("Starting handler for source connector with ID: 
{plugin_id}...");
+            let handle = source.callback;
+            tokio::task::spawn_blocking(move || {
+                handle(plugin_id, handle_produced_messages);
+            });
+            info!("Handler for source connector with ID: {plugin_id} started 
successfully.");
+
+            let (sender, receiver): (Sender<ProducedMessages>, 
Receiver<ProducedMessages>) =
+                flume::unbounded();
+            SOURCE_SENDERS.insert(plugin_id, sender);
+            tokio::spawn(async move {
+                info!("Source {plugin_id} started");
+                let producer = &plugin.producer.expect("Producer not 
initialized");
+                let encoder = producer.encoder.clone();
+                let producer = &producer.producer;
+                let mut number = 1;
+
+                let topic_metadata = TopicMetadata {
+                    stream: producer.stream().to_string(),
+                    topic: producer.topic().to_string(),
+                };
+
+                while let Ok(received_messages) = receiver.recv_async().await {
+                    let count = received_messages.messages.len();
+                    info!("[Source connector with ID: {plugin_id} received 
{count} messages",);
+                    let schema = received_messages.schema;
+                    let mut messages: Vec<DecodedMessage> = 
Vec::with_capacity(count);
+                    for message in received_messages.messages {
+                        let Ok(payload) = 
schema.try_into_payload(message.payload) else {
+                            error!(
+                                "Failed to decode message payload with schema: 
{}",
+                                received_messages.schema
+                            );
+                            continue;
+                        };
+
+                        debug!(
+                            "[Source {plugin_id}] Message: {number} | Schema: 
{schema} | Payload: {payload}"
+                        );
+                        messages.push(DecodedMessage {
+                            id: message.id,
+                            offset: None,
+                            headers: message.headers,
+                            payload,
+                        });
+                        number += 1;
+                    }
+
+                    let Ok(iggy_messages) =
+                        process_messages(&encoder, &topic_metadata, messages, 
&plugin.transforms)
+                    else {
+                        error!(
+                            "Failed to process {count} messages for source 
connector with ID: {plugin_id} before sending them to stream: {}, topic: {}.",
+                            producer.stream(),
+                            producer.topic()
+                        );
+                        continue;
+                    };
+
+                    if let Err(error) = producer.send(iggy_messages).await {
+                        error!(
+                            "Failed to send {count} messages to stream: {}, 
topic: {} for source connector with ID: {plugin_id}. {error}",
+                            producer.stream(),
+                            producer.topic(),
+                        );
+                        continue;
+                    }
+
+                    info!(
+                        "Sent {count} messages to stream: {}, topic: {} by 
source connector with ID: {plugin_id}",
+                        producer.stream(),
+                        producer.topic()
+                    );
+                }
+            });
+        }
+    }
+}
+
+fn process_messages(
+    encoder: &Arc<dyn StreamEncoder>,
+    topic_metadata: &TopicMetadata,
+    messages: Vec<DecodedMessage>,
+    transforms: &Vec<Arc<dyn Transform>>,
+) -> Result<Vec<IggyMessage>, Error> {
+    let mut iggy_messages = Vec::with_capacity(messages.len());
+    for message in messages {
+        let mut current_message = Some(message);
+        for transform in transforms.iter() {
+            let Some(message) = current_message else {
+                break;
+            };
+
+            current_message = transform.transform(topic_metadata, message)?;
+        }
+
+        // The transform may return no message based on some conditions
+        let Some(message) = current_message else {
+            continue;
+        };
+
+        let Ok(payload) = encoder.encode(message.payload) else {
+            error!("Failed to encode message payload");
+            continue;
+        };
+
+        let Ok(iggy_message) = build_iggy_message(payload, message.id, 
message.headers) else {
+            error!("Failed to build Iggy message");
+            continue;
+        };
+
+        iggy_messages.push(iggy_message);
+    }
+    Ok(iggy_messages)
+}
+
+extern "C" fn handle_produced_messages(
+    plugin_id: u32,
+    messages_ptr: *const u8,
+    messages_len: usize,
+) {
+    unsafe {
+        if let Some(sender) = SOURCE_SENDERS.get(&plugin_id) {
+            let messages = std::slice::from_raw_parts(messages_ptr, 
messages_len);
+            let Ok(messages) = 
postcard::from_bytes::<ProducedMessages>(messages) else {
+                eprintln!(
+                    "Failed to deserialize produced messages for source 
connector with ID: {plugin_id}"
+                );
+                return;
+            };
+            let _ = sender.send(messages);
+        }
+    }
+}
+
+fn build_iggy_message(
+    payload: Vec<u8>,
+    id: Option<u128>,
+    headers: Option<HashMap<HeaderKey, HeaderValue>>,
+) -> Result<IggyMessage, IggyError> {
+    match (id, headers) {
+        (Some(id), Some(h)) => IggyMessage::builder()
+            .payload(payload.into())
+            .id(id)
+            .user_headers(h)
+            .build(),
+        (Some(id), None) => IggyMessage::builder()
+            .payload(payload.into())
+            .id(id)
+            .build(),
+        (None, Some(h)) => IggyMessage::builder()
+            .payload(payload.into())
+            .user_headers(h)
+            .build(),
+        (None, None) => IggyMessage::builder().payload(payload.into()).build(),
+    }
+}
diff --git a/core/connectors/runtime/src/transform.rs 
b/core/connectors/runtime/src/transform.rs
new file mode 100644
index 00000000..b43a3268
--- /dev/null
+++ b/core/connectors/runtime/src/transform.rs
@@ -0,0 +1,64 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::sync::Arc;
+
+use iggy_connector_sdk::transforms::{
+    Transform, TransformType, add_fields::AddFields, 
delete_fields::DeleteFields,
+};
+use serde::de::DeserializeOwned;
+use tracing::error;
+
+use crate::{RuntimeError, SharedTransformConfig, TransformsConfig};
+
+pub fn load(config: TransformsConfig) -> Result<Vec<Arc<dyn Transform>>, 
RuntimeError> {
+    let mut transforms: Vec<Arc<dyn Transform>> = vec![];
+    for (r#type, transform_config) in config.transforms {
+        let shared_config =
+            
serde_json::from_value::<SharedTransformConfig>(transform_config.clone())
+                .unwrap_or_default();
+        if !shared_config.enabled {
+            continue;
+        }
+
+        let transform = load_transform(r#type, transform_config)?;
+        transforms.push(transform);
+    }
+
+    Ok(transforms)
+}
+
+fn load_transform(
+    r#type: TransformType,
+    config: serde_json::Value,
+) -> Result<Arc<dyn Transform>, RuntimeError> {
+    Ok(match r#type {
+        TransformType::AddFields => Arc::new(AddFields::new(as_config(r#type, 
config)?)),
+        TransformType::DeleteFields => 
Arc::new(DeleteFields::new(as_config(r#type, config)?)),
+    })
+}
+
+fn as_config<T: DeserializeOwned>(
+    r#type: TransformType,
+    config: serde_json::Value,
+) -> Result<T, RuntimeError> {
+    serde_json::from_value::<T>(config).map_err(|error| {
+        error!("Failed to deserialize config for transform: {type}. {error}");
+        RuntimeError::InvalidConfig
+    })
+}
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
new file mode 100644
index 00000000..7c96ed3c
--- /dev/null
+++ b/core/connectors/sdk/Cargo.toml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_sdk"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[dependencies]
+async-trait = { workspace = true }
+chrono = { workspace = true }
+dashmap = { workspace = true }
+iggy = { workspace = true }
+once_cell = { workspace = true }
+postcard = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+simd-json = { workspace = true }
+strum_macros = { workspace = true }
+thiserror = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
+uuid = { workspace = true }
diff --git a/core/connectors/sdk/src/decoders/json.rs 
b/core/connectors/sdk/src/decoders/json.rs
new file mode 100644
index 00000000..f2bda418
--- /dev/null
+++ b/core/connectors/sdk/src/decoders/json.rs
@@ -0,0 +1,37 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamDecoder};
+use tracing::error;
+
+pub struct JsonStreamDecoder;
+
+impl StreamDecoder for JsonStreamDecoder {
+    fn schema(&self) -> Schema {
+        Schema::Json
+    }
+
+    fn decode(&self, mut payload: Vec<u8>) -> Result<Payload, Error> {
+        Ok(Payload::Json(
+            simd_json::to_owned_value(&mut payload).map_err(|error| {
+                error!("Failed to decode JSON payload: {error}");
+                Error::CannotDecode(self.schema())
+            })?,
+        ))
+    }
+}
diff --git a/core/connectors/sdk/src/decoders/mod.rs 
b/core/connectors/sdk/src/decoders/mod.rs
new file mode 100644
index 00000000..e1b7e1b7
--- /dev/null
+++ b/core/connectors/sdk/src/decoders/mod.rs
@@ -0,0 +1,21 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod json;
+pub mod raw;
+pub mod text;
diff --git a/core/connectors/sdk/src/decoders/raw.rs 
b/core/connectors/sdk/src/decoders/raw.rs
new file mode 100644
index 00000000..efbddc12
--- /dev/null
+++ b/core/connectors/sdk/src/decoders/raw.rs
@@ -0,0 +1,31 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamDecoder};
+
+pub struct RawStreamDecoder;
+
+impl StreamDecoder for RawStreamDecoder {
+    fn schema(&self) -> Schema {
+        Schema::Raw
+    }
+
+    fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error> {
+        Ok(Payload::Raw(payload))
+    }
+}
diff --git a/core/connectors/sdk/src/decoders/text.rs 
b/core/connectors/sdk/src/decoders/text.rs
new file mode 100644
index 00000000..6d7722cb
--- /dev/null
+++ b/core/connectors/sdk/src/decoders/text.rs
@@ -0,0 +1,37 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamDecoder};
+use tracing::error;
+
+pub struct TextStreamDecoder;
+
+impl StreamDecoder for TextStreamDecoder {
+    fn schema(&self) -> Schema {
+        Schema::Text
+    }
+
+    fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error> {
+        Ok(Payload::Text(String::from_utf8(payload).map_err(
+            |error| {
+                error!("Failed to decode text payload: {error}");
+                Error::CannotDecode(self.schema())
+            },
+        )?))
+    }
+}
diff --git a/core/connectors/sdk/src/encoders/json.rs 
b/core/connectors/sdk/src/encoders/json.rs
new file mode 100644
index 00000000..03f4e4f0
--- /dev/null
+++ b/core/connectors/sdk/src/encoders/json.rs
@@ -0,0 +1,39 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamEncoder};
+
+pub struct JsonStreamEncoder;
+
+impl StreamEncoder for JsonStreamEncoder {
+    fn schema(&self) -> Schema {
+        Schema::Json
+    }
+
+    fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error> {
+        match payload {
+            Payload::Text(value) => {
+                Ok(simd_json::to_vec(&value).map_err(|_| 
Error::InvalidJsonPayload)?)
+            }
+            Payload::Json(value) => {
+                Ok(simd_json::to_vec(&value).map_err(|_| 
Error::InvalidJsonPayload)?)
+            }
+            _ => Err(Error::InvalidPayloadType),
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/encoders/mod.rs 
b/core/connectors/sdk/src/encoders/mod.rs
new file mode 100644
index 00000000..e1b7e1b7
--- /dev/null
+++ b/core/connectors/sdk/src/encoders/mod.rs
@@ -0,0 +1,21 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod json;
+pub mod raw;
+pub mod text;
diff --git a/core/connectors/sdk/src/encoders/raw.rs 
b/core/connectors/sdk/src/encoders/raw.rs
new file mode 100644
index 00000000..68b3b9bb
--- /dev/null
+++ b/core/connectors/sdk/src/encoders/raw.rs
@@ -0,0 +1,34 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamEncoder};
+
+pub struct RawStreamEncoder;
+
+impl StreamEncoder for RawStreamEncoder {
+    fn schema(&self) -> Schema {
+        Schema::Raw
+    }
+
+    fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error> {
+        match payload {
+            Payload::Raw(value) => Ok(value),
+            _ => Err(Error::InvalidPayloadType),
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/encoders/text.rs 
b/core/connectors/sdk/src/encoders/text.rs
new file mode 100644
index 00000000..6fb2d4e9
--- /dev/null
+++ b/core/connectors/sdk/src/encoders/text.rs
@@ -0,0 +1,37 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamEncoder};
+
+pub struct TextStreamEncoder;
+
+impl StreamEncoder for TextStreamEncoder {
+    fn schema(&self) -> Schema {
+        Schema::Text
+    }
+
+    fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error> {
+        match payload {
+            Payload::Text(value) => Ok(value.into_bytes()),
+            Payload::Json(value) => {
+                Ok(simd_json::to_vec(&value).map_err(|_| 
Error::InvalidJsonPayload)?)
+            }
+            _ => Err(Error::InvalidPayloadType),
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
new file mode 100644
index 00000000..749c666e
--- /dev/null
+++ b/core/connectors/sdk/src/lib.rs
@@ -0,0 +1,239 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use decoders::{json::JsonStreamDecoder, raw::RawStreamDecoder, 
text::TextStreamDecoder};
+use encoders::{json::JsonStreamEncoder, raw::RawStreamEncoder, 
text::TextStreamEncoder};
+use iggy::prelude::{HeaderKey, HeaderValue};
+use once_cell::sync::OnceCell;
+use serde::{Deserialize, Serialize};
+use std::{collections::HashMap, sync::Arc};
+use strum_macros::{Display, IntoStaticStr};
+use thiserror::Error;
+use tokio::runtime::Runtime;
+
+pub mod decoders;
+pub mod encoders;
+pub mod sink;
+pub mod source;
+pub mod transforms;
+
+static RUNTIME: OnceCell<Runtime> = OnceCell::new();
+
+pub fn get_runtime() -> &'static Runtime {
+    RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio 
runtime"))
+}
+
+#[async_trait]
+pub trait Source: Send + Sync {
+    async fn open(&mut self) -> Result<(), Error>;
+    async fn poll(&self) -> Result<ProducedMessages, Error>;
+    async fn close(&mut self);
+}
+
+#[async_trait]
+pub trait Sink: Send + Sync {
+    async fn open(&mut self) -> Result<(), Error>;
+    async fn consume(
+        &self,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error>;
+    async fn close(&mut self);
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum Payload {
+    Json(simd_json::OwnedValue),
+    Raw(Vec<u8>),
+    Text(String),
+}
+
+impl Payload {
+    pub fn try_into_vec(self) -> Result<Vec<u8>, Error> {
+        match self {
+            Payload::Json(value) => {
+                Ok(simd_json::to_vec(&value).map_err(|_| 
Error::InvalidJsonPayload)?)
+            }
+            Payload::Raw(value) => Ok(value),
+            Payload::Text(text) => Ok(text.into_bytes()),
+        }
+    }
+}
+
+impl std::fmt::Display for Payload {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Payload::Json(value) => write!(
+                f,
+                "Json({})",
+                simd_json::to_string_pretty(value).unwrap_or_default()
+            ),
+            Payload::Raw(value) => write!(f, "Raw({:#?})", value),
+            Payload::Text(text) => write!(f, "Text({})", text),
+        }
+    }
+}
+
+#[repr(C)]
+#[derive(
+    Debug, Copy, Clone, Eq, Hash, PartialEq, Serialize, Deserialize, Display, 
IntoStaticStr,
+)]
+#[serde(rename_all = "snake_case")]
+pub enum Schema {
+    #[strum(to_string = "json")]
+    Json,
+    #[strum(to_string = "raw")]
+    Raw,
+    #[strum(to_string = "text")]
+    Text,
+}
+
+impl Schema {
+    pub fn try_into_payload(self, mut value: Vec<u8>) -> Result<Payload, 
Error> {
+        match self {
+            Schema::Json => Ok(Payload::Json(
+                simd_json::to_owned_value(&mut value).map_err(|_| 
Error::InvalidJsonPayload)?,
+            )),
+            Schema::Raw => Ok(Payload::Raw(value)),
+            Schema::Text => Ok(Payload::Text(
+                String::from_utf8(value).map_err(|_| 
Error::InvalidTextPayload)?,
+            )),
+        }
+    }
+
+    pub fn decoder(self) -> Arc<dyn StreamDecoder> {
+        match self {
+            Schema::Json => Arc::new(JsonStreamDecoder),
+            Schema::Raw => Arc::new(RawStreamDecoder),
+            Schema::Text => Arc::new(TextStreamDecoder),
+        }
+    }
+
+    pub fn encoder(self) -> Arc<dyn StreamEncoder> {
+        match self {
+            Schema::Json => Arc::new(JsonStreamEncoder),
+            Schema::Raw => Arc::new(RawStreamEncoder),
+            Schema::Text => Arc::new(TextStreamEncoder),
+        }
+    }
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct TopicMetadata {
+    pub stream: String,
+    pub topic: String,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct MessagesMetadata {
+    pub partition_id: u32,
+    pub current_offset: u64,
+    pub schema: Schema,
+}
+
+#[repr(C)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ReceivedMessage {
+    pub id: u128,
+    pub offset: u64,
+    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub payload: Vec<u8>,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct ProducedMessages {
+    pub schema: Schema,
+    pub messages: Vec<ProducedMessage>,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct ProducedMessage {
+    pub id: Option<u128>,
+    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub payload: Vec<u8>,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct DecodedMessage {
+    pub id: Option<u128>,
+    pub offset: Option<u64>,
+    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub payload: Payload,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct RawMessages {
+    pub schema: Schema,
+    pub messages: Vec<RawMessage>,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct RawMessage {
+    pub offset: u64,
+    pub headers: Vec<u8>,
+    pub payload: Vec<u8>,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct ConsumedMessage {
+    pub offset: u64,
+    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub payload: Payload,
+}
+
+pub trait StreamDecoder: Send + Sync {
+    fn schema(&self) -> Schema;
+    fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error>;
+}
+
+pub trait StreamEncoder: Send + Sync {
+    fn schema(&self) -> Schema;
+    fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error>;
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Error)]
+pub enum Error {
+    #[error("Invalid config")]
+    InvalidConfig,
+    #[error("Invalid record")]
+    InvalidRecord,
+    #[error("Invalid transformer")]
+    InvalidTransformer,
+    #[error("HTTP request failed: {0}")]
+    HttpRequestFailed(String),
+    #[error("Init error: {0}")]
+    InitError(String),
+    #[error("Invalid payload type")]
+    InvalidPayloadType,
+    #[error("Invalid JSON payload.")]
+    InvalidJsonPayload,
+    #[error("Invalid text payload.")]
+    InvalidTextPayload,
+    #[error("Cannot decode schema {0}")]
+    CannotDecode(Schema),
+}
diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
new file mode 100644
index 00000000..bd836ac6
--- /dev/null
+++ b/core/connectors/sdk/src/sink.rs
@@ -0,0 +1,240 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::de::DeserializeOwned;
+use tokio::sync::watch;
+use tracing::{error, info};
+use tracing_subscriber::fmt;
+
+use crate::{ConsumedMessage, MessagesMetadata, RawMessages, Sink, 
TopicMetadata, get_runtime};
+
+pub type ConsumeCallback = extern "C" fn(
+    plugin_id: u32,
+    topic_meta_ptr: *const u8,
+    topic_meta_len: usize,
+    messages_meta_ptr: *const u8,
+    messages_meta_len: usize,
+    messages_ptr: *const u8,
+    messages_len: usize,
+) -> i32;
+
+#[derive(Debug)]
+pub struct SinkContainer<T: Sink + std::fmt::Debug> {
+    id: u32,
+    sink: Option<T>,
+    shutdown: Option<watch::Sender<()>>,
+}
+
+impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
+    pub const fn new(id: u32) -> Self {
+        Self {
+            id,
+            sink: None,
+            shutdown: None,
+        }
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn open<F, C>(
+        &mut self,
+        id: u32,
+        config_ptr: *const u8,
+        config_len: usize,
+        factory: F,
+    ) -> i32
+    where
+        F: FnOnce(u32, C) -> T,
+        C: DeserializeOwned,
+    {
+        unsafe {
+            _ = fmt::try_init();
+            let slice = std::slice::from_raw_parts(config_ptr, config_len);
+            let Ok(config_str) = std::str::from_utf8(slice) else {
+                return -1;
+            };
+
+            let Ok(config) = serde_json::from_str(config_str) else {
+                return -1;
+            };
+
+            let mut sink = factory(id, config);
+            let runtime = get_runtime();
+            let result = runtime.block_on(sink.open());
+            self.id = id;
+            self.sink = Some(sink);
+            if result.is_ok() { 0 } else { 1 }
+        }
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn close(&mut self) -> i32 {
+        let Some(mut sink) = self.sink.take() else {
+            error!(
+                "Sink with ID: {} is not initialized - cannot close.",
+                self.id
+            );
+            return -1;
+        };
+
+        info!("Closing sink with ID: {}...", self.id);
+        if let Some(tx) = self.shutdown.take() {
+            let _ = tx.send(());
+        }
+
+        let runtime = get_runtime();
+        runtime.block_on(sink.close());
+        info!("Closed sink with ID: {}", self.id);
+        0
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn consume(
+        &self,
+        topic_meta_ptr: *const u8,
+        topic_meta_len: usize,
+        messages_meta_ptr: *const u8,
+        messages_meta_len: usize,
+        messages_ptr: *const u8,
+        messages_len: usize,
+    ) -> i32 {
+        unsafe {
+            let Some(sink) = self.sink.as_ref() else {
+                error!(
+                    "Sink with ID: {} is not initialized - cannot consume.",
+                    self.id
+                );
+                return -1;
+            };
+
+            let topic_meta_slice =
+                std::slice::from_raw_parts(topic_meta_ptr, 
topic_meta_len).to_vec();
+            let messages_meta_slice =
+                std::slice::from_raw_parts(messages_meta_ptr, 
messages_meta_len).to_vec();
+            let messages_slice = std::slice::from_raw_parts(messages_ptr, 
messages_len).to_vec();
+
+            let Ok(topic_metadata) = 
postcard::from_bytes::<TopicMetadata>(&topic_meta_slice)
+            else {
+                error!("Failed to decode topic metadata");
+                return -1;
+            };
+
+            let Ok(messages_metadata) =
+                postcard::from_bytes::<MessagesMetadata>(&messages_meta_slice)
+            else {
+                error!("Failed to decode messages metadata");
+                return -1;
+            };
+
+            let Ok(raw_messages) = 
postcard::from_bytes::<RawMessages>(&messages_slice) else {
+                error!("Failed to decode raw messages");
+                return -1;
+            };
+
+            let mut messages = Vec::with_capacity(raw_messages.messages.len());
+            for message in raw_messages.messages {
+                let headers = if message.headers.is_empty() {
+                    None
+                } else {
+                    let Ok(headers) = postcard::from_bytes(&message.headers) 
else {
+                        error!("Failed to decode message headers");
+                        continue;
+                    };
+                    Some(headers)
+                };
+
+                let Ok(payload) = 
messages_metadata.schema.try_into_payload(message.payload) else {
+                    error!("Failed to decode message payload");
+                    continue;
+                };
+
+                messages.push(ConsumedMessage {
+                    offset: message.offset,
+                    headers,
+                    payload,
+                })
+            }
+
+            let runtime = get_runtime();
+            let result =
+                runtime.block_on(sink.consume(&topic_metadata, 
messages_metadata, messages));
+            if result.is_ok() { 0 } else { 1 }
+        }
+    }
+}
+
+#[macro_export]
+macro_rules! sink_connector {
+    ($type:ty) => {
+        const _: fn() = || {
+            fn assert_trait<T: $crate::Sink>() {}
+            assert_trait::<$type>();
+        };
+
+        use dashmap::DashMap;
+        use once_cell::sync::Lazy;
+        use $crate::sink::SinkContainer;
+
+        static INSTANCES: Lazy<DashMap<u32, SinkContainer<$type>>> = 
Lazy::new(DashMap::new);
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn open(id: u32, config_ptr: *const u8, config_len: 
usize) -> i32 {
+            let mut container = SinkContainer::new(id);
+            let result = container.open(id, config_ptr, config_len, 
<$type>::new);
+            INSTANCES.insert(id, container);
+            result
+        }
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn consume(
+            id: u32,
+            topic_meta_ptr: *const u8,
+            topic_meta_len: usize,
+            messages_meta_ptr: *const u8,
+            messages_meta_len: usize,
+            messages_ptr: *const u8,
+            messages_len: usize,
+        ) -> i32 {
+            let Some(instance) = INSTANCES.get(&id) else {
+                eprintln!(
+                    "Sink connector with ID: {id} was not found and consume 
cannot be invoked."
+                );
+                return -1;
+            };
+            instance.consume(
+                topic_meta_ptr,
+                topic_meta_len,
+                messages_meta_ptr,
+                messages_meta_len,
+                messages_ptr,
+                messages_len,
+            )
+        }
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn close(id: u32) -> i32 {
+            let Some(mut instance) = INSTANCES.remove(&id) else {
+                eprintln!("Sink connector with ID: {id} was not found and 
cannot be closed.");
+                return -1;
+            };
+            instance.1.close()
+        }
+    };
+}
diff --git a/core/connectors/sdk/src/source.rs 
b/core/connectors/sdk/src/source.rs
new file mode 100644
index 00000000..ef47b7ee
--- /dev/null
+++ b/core/connectors/sdk/src/source.rs
@@ -0,0 +1,218 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Source, get_runtime};
+use serde::de::DeserializeOwned;
+use std::sync::Arc;
+use tokio::{sync::watch, task::JoinHandle};
+use tracing::{error, info};
+use tracing_subscriber::fmt;
+
+#[repr(C)]
+pub struct RawMessage {
+    pub offset: u64,
+    pub headers_ptr: *const u8,
+    pub headers_len: usize,
+    pub payload_ptr: *const u8,
+    pub payload_len: usize,
+}
+
+pub type HandleCallback = extern "C" fn(plugin_id: u32, callback: 
SendCallback) -> i32;
+
+pub type SendCallback = extern "C" fn(plugin_id: u32, messages_ptr: *const u8, 
messages_len: usize);
+
+#[derive(Debug)]
+pub struct SourceContainer<T: Source + std::fmt::Debug> {
+    id: u32,
+    source: Option<Arc<T>>,
+    shutdown: Option<watch::Sender<()>>,
+    task: Option<JoinHandle<()>>,
+}
+
+impl<T: Source + std::fmt::Debug + 'static> SourceContainer<T> {
+    pub const fn new(id: u32) -> Self {
+        Self {
+            id,
+            source: None,
+            shutdown: None,
+            task: None,
+        }
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn open<F, C>(
+        &mut self,
+        id: u32,
+        config_ptr: *const u8,
+        config_len: usize,
+        factory: F,
+    ) -> i32
+    where
+        F: FnOnce(u32, C) -> T,
+        C: DeserializeOwned,
+    {
+        unsafe {
+            _ = fmt::try_init();
+            let slice = std::slice::from_raw_parts(config_ptr, config_len);
+            let Ok(config_str) = std::str::from_utf8(slice) else {
+                return -1;
+            };
+
+            let Ok(config) = serde_json::from_str(config_str) else {
+                return -1;
+            };
+
+            let mut source = factory(id, config);
+            let runtime = get_runtime();
+            let result = runtime.block_on(source.open());
+            self.id = id;
+            self.source = Some(Arc::new(source));
+            if result.is_ok() { 0 } else { 1 }
+        }
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn close(&mut self) -> i32 {
+        let Some(source) = self.source.take() else {
+            error!(
+                "Source with ID: {} is not initialized - cannot close.",
+                self.id
+            );
+            return -1;
+        };
+
+        info!("Closing source with ID: {}...", self.id);
+        if let Some(sender) = self.shutdown.take() {
+            let _ = sender.send(());
+        }
+
+        let runtime = get_runtime();
+        if let Some(handle) = self.task.take() {
+            let _ = runtime.block_on(handle);
+        }
+
+        let Ok(mut source) = Arc::try_unwrap(source) else {
+            error!("Source with ID: {} was already closed.", self.id);
+            return -1;
+        };
+
+        runtime.block_on(source.close());
+        info!("Closed source with ID: {}", self.id);
+        0
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn handle(&mut self, callback: SendCallback) -> i32 {
+        let Some(source) = self.source.as_ref() else {
+            error!(
+                "Source with ID: {} is not initialized - cannot handle.",
+                self.id
+            );
+            return -1;
+        };
+
+        let runtime = get_runtime();
+        let (shutdown_tx, shutdown_rx) = watch::channel(());
+        let plugin_id = self.id;
+        let source = Arc::clone(source);
+        let handle = runtime.spawn(async move {
+            let _ = handle_messages(plugin_id, source, callback, 
shutdown_rx).await;
+        });
+
+        self.shutdown = Some(shutdown_tx);
+        self.task = Some(handle);
+        0
+    }
+}
+
+async fn handle_messages<T: Source>(
+    plugin_id: u32,
+    source: Arc<T>,
+    callback: SendCallback,
+    mut shutdown: watch::Receiver<()>,
+) -> Result<(), Error> {
+    loop {
+        tokio::select! {
+            _ = shutdown.changed() => {
+                info!("Shutting down source container with ID: {plugin_id}");
+                break;
+            }
+            messages = source.poll() => {
+                let Ok(messages) = messages else {
+                    error!("Failed to poll messages for source container with 
ID: {plugin_id}");
+                    continue;
+                };
+
+                let Ok(messages) = postcard::to_allocvec(&messages) else {
+                    error!("Failed to serialize messages for source container 
with ID: {plugin_id}");
+                    continue;
+                };
+
+                callback(plugin_id, messages.as_ptr(), messages.len());
+            }
+        }
+    }
+
+    Ok(())
+}
+
+#[macro_export]
+macro_rules! source_connector {
+    ($type:ty) => {
+        const _: fn() = || {
+            fn assert_trait<T: $crate::Source>() {}
+            assert_trait::<$type>();
+        };
+
+        use dashmap::DashMap;
+        use once_cell::sync::Lazy;
+        use $crate::source::SendCallback;
+        use $crate::source::SourceContainer;
+
+        static INSTANCES: Lazy<DashMap<u32, SourceContainer<$type>>> = 
Lazy::new(DashMap::new);
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn open(id: u32, config_ptr: *const u8, config_len: 
usize) -> i32 {
+            let mut container = SourceContainer::new(id);
+            let result = container.open(id, config_ptr, config_len, 
<$type>::new);
+            INSTANCES.insert(id, container);
+            result
+        }
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn handle(id: u32, callback: SendCallback) -> i32 {
+            let Some(mut instance) = INSTANCES.get_mut(&id) else {
+                eprintln!("Source connector with ID: {id} was not found and 
cannot be handled.");
+                return -1;
+            };
+            instance.handle(callback)
+        }
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn close(id: u32) -> i32 {
+            let Some(mut instance) = INSTANCES.remove(&id) else {
+                eprintln!("Source connector with ID: {id} was not found and 
cannot be closed.");
+                return -1;
+            };
+            instance.1.close()
+        }
+    };
+}
diff --git a/core/connectors/sdk/src/transforms/add_fields.rs 
b/core/connectors/sdk/src/transforms/add_fields.rs
new file mode 100644
index 00000000..569a16b8
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/add_fields.rs
@@ -0,0 +1,128 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Serialize};
+use simd_json::OwnedValue;
+use strum_macros::{Display, IntoStaticStr};
+
+use crate::{DecodedMessage, Error, Payload, TopicMetadata};
+
+use super::{Transform, TransformType};
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct AddFieldsConfig {
+    fields: Vec<Field>,
+}
+
+pub struct AddFields {
+    fields: Vec<Field>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct Field {
+    key: String,
+    value: FieldValue,
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+enum FieldValue {
+    Static(simd_json::OwnedValue),
+    Computed(ComputedValue),
+}
+
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Display, 
IntoStaticStr)]
+#[serde(rename_all = "snake_case")]
+enum ComputedValue {
+    #[strum(to_string = "date_time")]
+    DateTime,
+    #[strum(to_string = "timestamp_nanos")]
+    TimestampNanos,
+    #[strum(to_string = "timestamp_micros")]
+    TimestampMicros,
+    #[strum(to_string = "timestamp_millis")]
+    TimestampMillis,
+    #[strum(to_string = "timestamp_seconds")]
+    TimestampSeconds,
+    #[strum(to_string = "uuid_v4")]
+    UuidV4,
+    #[strum(to_string = "uuid_v7")]
+    UuidV7,
+}
+
+impl AddFields {
+    pub fn new(config: AddFieldsConfig) -> Self {
+        Self {
+            fields: config.fields,
+        }
+    }
+}
+
+impl Transform for AddFields {
+    fn r#type(&self) -> TransformType {
+        TransformType::AddFields
+    }
+
+    fn transform(
+        &self,
+        _metadata: &TopicMetadata,
+        mut message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error> {
+        if self.fields.is_empty() {
+            return Ok(Some(message));
+        }
+
+        let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload 
else {
+            return Ok(Some(message));
+        };
+
+        for field in &self.fields {
+            match &field.value {
+                FieldValue::Static(value) => map.insert(field.key.clone(), 
value.clone()),
+                FieldValue::Computed(value) => match value {
+                    ComputedValue::DateTime => {
+                        map.insert(field.key.clone(), 
chrono::Utc::now().to_rfc3339().into())
+                    }
+                    ComputedValue::TimestampMillis => map.insert(
+                        field.key.clone(),
+                        chrono::Utc::now().timestamp_millis().into(),
+                    ),
+                    ComputedValue::TimestampMicros => map.insert(
+                        field.key.clone(),
+                        chrono::Utc::now().timestamp_micros().into(),
+                    ),
+                    ComputedValue::TimestampNanos => map.insert(
+                        field.key.clone(),
+                        chrono::Utc::now().timestamp_nanos_opt().into(),
+                    ),
+                    ComputedValue::TimestampSeconds => {
+                        map.insert(field.key.clone(), 
chrono::Utc::now().timestamp().into())
+                    }
+                    ComputedValue::UuidV4 => {
+                        map.insert(field.key.clone(), 
uuid::Uuid::new_v4().to_string().into())
+                    }
+                    ComputedValue::UuidV7 => {
+                        map.insert(field.key.clone(), 
uuid::Uuid::now_v7().to_string().into())
+                    }
+                },
+            };
+        }
+
+        Ok(Some(message))
+    }
+}
diff --git a/core/connectors/sdk/src/transforms/delete_fields.rs 
b/core/connectors/sdk/src/transforms/delete_fields.rs
new file mode 100644
index 00000000..d7dd755a
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/delete_fields.rs
@@ -0,0 +1,61 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::collections::HashSet;
+
+use serde::{Deserialize, Serialize};
+use simd_json::OwnedValue;
+
+use crate::{DecodedMessage, Error, Payload, TopicMetadata};
+
+use super::{Transform, TransformType};
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct DeleteFieldsConfig {
+    fields: Vec<String>,
+}
+
+pub struct DeleteFields {
+    fields: HashSet<String>,
+}
+
+impl DeleteFields {
+    pub fn new(config: DeleteFieldsConfig) -> Self {
+        Self {
+            fields: config.fields.into_iter().collect(),
+        }
+    }
+}
+
+impl Transform for DeleteFields {
+    fn r#type(&self) -> TransformType {
+        TransformType::DeleteFields
+    }
+
+    fn transform(
+        &self,
+        _metadata: &TopicMetadata,
+        mut message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error> {
+        if let Payload::Json(OwnedValue::Object(ref mut map)) = 
message.payload {
+            map.retain(|key, _| !self.fields.contains(key));
+        }
+
+        Ok(Some(message))
+    }
+}
diff --git a/core/connectors/sdk/src/transforms/mod.rs 
b/core/connectors/sdk/src/transforms/mod.rs
new file mode 100644
index 00000000..35cf3382
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/mod.rs
@@ -0,0 +1,45 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Serialize};
+use strum_macros::{Display, IntoStaticStr};
+
+use crate::{DecodedMessage, Error, TopicMetadata};
+
+pub mod add_fields;
+pub mod delete_fields;
+
+pub trait Transform: Send + Sync {
+    fn r#type(&self) -> TransformType;
+    fn transform(
+        &self,
+        metadata: &TopicMetadata,
+        message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error>;
+}
+
+#[derive(
+    Debug, Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, Display, 
IntoStaticStr,
+)]
+#[serde(rename_all = "snake_case")]
+pub enum TransformType {
+    #[strum(to_string = "add_fields")]
+    AddFields,
+    #[strum(to_string = "delete_fields")]
+    DeleteFields,
+}
diff --git a/core/connectors/sinks/quickwit_sink/Cargo.toml 
b/core/connectors/sinks/quickwit_sink/Cargo.toml
new file mode 100644
index 00000000..cf6a2170
--- /dev/null
+++ b/core/connectors/sinks/quickwit_sink/Cargo.toml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_quickwit_sink"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[lib]
+crate-type = ["cdylib"]
+
+[dependencies]
+async-trait = { workspace = true }
+dashmap = { workspace = true }
+iggy_connector_sdk = { workspace = true }
+once_cell = { workspace = true }
+reqwest = { workspace = true }
+serde = { workspace = true }
+serde_yml = { workspace = true }
+simd-json = { workspace = true }
+tracing = { workspace = true }
+
+[package.metadata.cargo-machete]
+ignored = ["dashmap", "once_cell"]
diff --git a/core/connectors/sinks/quickwit_sink/src/lib.rs 
b/core/connectors/sinks/quickwit_sink/src/lib.rs
new file mode 100644
index 00000000..cf0962cd
--- /dev/null
+++ b/core/connectors/sinks/quickwit_sink/src/lib.rs
@@ -0,0 +1,209 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, 
sink_connector,
+};
+use serde::{Deserialize, Serialize};
+use tracing::{error, info, warn};
+
+sink_connector!(QuickwitSink);
+
+#[derive(Debug)]
+pub struct QuickwitSink {
+    id: u32,
+    config: QuickwitSinkConfig,
+    client: reqwest::Client,
+    index_id: String,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct QuickwitSinkConfig {
+    url: String,
+    index: String,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct IndexConfig {
+    index_id: String,
+}
+
+impl QuickwitSink {
+    pub fn new(id: u32, config: QuickwitSinkConfig) -> Self {
+        let index_config =
+            serde_yml::from_str::<IndexConfig>(&config.index).expect("Invalid 
index config.");
+        QuickwitSink {
+            id,
+            config,
+            index_id: index_config.index_id,
+            client: reqwest::Client::new(),
+        }
+    }
+
+    async fn has_index(&self) -> Result<bool, Error> {
+        let url = format!("{}/api/v1/indexes/{}", self.config.url, 
self.index_id);
+        let response = self.client.get(&url).send().await.map_err(|error| {
+            error!(
+                "Failed to send HTTP request to check if index with ID: {} 
exists. {error}",
+                self.index_id
+            );
+            Error::HttpRequestFailed(error.to_string())
+        })?;
+        let status = response.status();
+        if status.is_success() {
+            Ok(true)
+        } else if status == reqwest::StatusCode::NOT_FOUND {
+            Ok(false)
+        } else {
+            Err(Error::HttpRequestFailed(format!(
+                "Unexpected status code: {status}",
+            )))
+        }
+    }
+
+    async fn create_index(&self) -> Result<(), Error> {
+        info!("Creating index: {}", self.index_id);
+        let url = format!("{}/api/v1/indexes", self.config.url);
+        let response = self
+            .client
+            .post(&url)
+            .header("content-type", "application/yaml")
+            .body(self.config.index.to_owned())
+            .send()
+            .await
+            .map_err(|error| {
+                error!(
+                    "Failed to send HTTP request to create index: {}. {error}",
+                    self.index_id
+                );
+                Error::HttpRequestFailed(error.to_string())
+            })?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let reason = response.text().await.unwrap_or_default();
+            error!(
+                "Received an invalid HTTP response when creating index: {}. 
Status code: {status}, reason: {reason}",
+                self.index_id
+            );
+            return Err(Error::InitError(format!(
+                "Failed to create index: {}. {reason}",
+                self.index_id
+            )));
+        }
+
+        info!("Created index: {}", self.index_id);
+        Ok(())
+    }
+
+    pub async fn ingest(&self, messages: Vec<simd_json::OwnedValue>) -> 
Result<(), Error> {
+        let url = format!(
+            "{}/api/v1/{}/ingest?commit=auto",
+            self.config.url, self.index_id
+        );
+        info!("Ingesting messages for index: {}...", self.index_id);
+        let messages_count = messages.len();
+        let messages = messages
+            .into_iter()
+            .filter_map(|record| simd_json::to_string(&record).ok())
+            .collect::<Vec<_>>()
+            .join("\n");
+
+        let response = self
+            .client
+            .post(&url)
+            .body(messages)
+            .send()
+            .await
+            .map_err(|error| {
+                error!(
+                    "Failed to send HTTP request to ingest messages for index: 
{}. {error}",
+                    self.index_id
+                );
+                Error::HttpRequestFailed(error.to_string())
+            })?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let text = response.text().await.unwrap_or_default();
+            error!(
+                "Received an invalid HTTP response when ingesting messages for 
index: {}. Status code: {status}, reason: {text}",
+                self.index_id
+            );
+            return Err(Error::HttpRequestFailed(format!(
+                "Status code: {status}, reason: {text}"
+            )));
+        }
+
+        info!(
+            "Ingested {messages_count} messages for index: {}",
+            self.index_id
+        );
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl Sink for QuickwitSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Initialized QuickwitSink with ID: {} for URL: {}",
+            self.id, self.config.url
+        );
+        if !self.has_index().await? {
+            self.create_index().await?;
+        }
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        info!(
+            "Quickwit sink with ID: {} received: {} messages, format: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.schema
+        );
+
+        let mut json_payloads = Vec::with_capacity(messages.len());
+        for message in messages {
+            match message.payload {
+                Payload::Json(value) => json_payloads.push(value),
+                _ => {
+                    warn!("Unsupported payload format: {}", 
messages_metadata.schema);
+                }
+            }
+        }
+
+        if json_payloads.is_empty() {
+            return Ok(());
+        }
+
+        self.ingest(json_payloads).await?;
+        Ok(())
+    }
+
+    async fn close(&mut self) {
+        info!("Quickwit sink with ID: {} is shutting down", self.id);
+    }
+}
diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml 
b/core/connectors/sinks/stdout_sink/Cargo.toml
new file mode 100644
index 00000000..1251732a
--- /dev/null
+++ b/core/connectors/sinks/stdout_sink/Cargo.toml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_stdout_sink"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[lib]
+crate-type = ["cdylib"]
+
+[dependencies]
+async-trait = { workspace = true }
+dashmap = { workspace = true }
+iggy_connector_sdk = { workspace = true }
+once_cell = { workspace = true }
+serde = { workspace = true }
+tracing = { workspace = true }
+
+[package.metadata.cargo-machete]
+ignored = ["dashmap", "once_cell"]
diff --git a/core/connectors/sinks/stdout_sink/src/lib.rs 
b/core/connectors/sinks/stdout_sink/src/lib.rs
new file mode 100644
index 00000000..afa9df59
--- /dev/null
+++ b/core/connectors/sinks/stdout_sink/src/lib.rs
@@ -0,0 +1,88 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata, 
sink_connector,
+};
+use serde::{Deserialize, Serialize};
+use tracing::info;
+
+sink_connector!(StdoutSink);
+
+#[derive(Debug)]
+pub struct StdoutSink {
+    id: u32,
+    print_payload: bool,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StdoutSinkConfig {
+    print_payload: Option<bool>,
+}
+
+impl StdoutSink {
+    pub fn new(id: u32, config: StdoutSinkConfig) -> Self {
+        StdoutSink {
+            id,
+            print_payload: config.print_payload.unwrap_or(false),
+        }
+    }
+}
+
+#[async_trait]
+impl Sink for StdoutSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Initialized stdout sink with ID: {}. Print payload: {}",
+            self.id, self.print_payload
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        info!(
+            "Stdout sink with ID: {} received: {} messages, schema: {}, 
stream: {}, topic: {}, partition: {}, offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.schema,
+            topic_metadata.stream,
+            topic_metadata.stream,
+            messages_metadata.partition_id,
+            messages_metadata.current_offset
+        );
+        if self.print_payload {
+            for message in messages {
+                info!(
+                    "Message offset: {}, payload: {:#?}",
+                    message.offset, message.payload
+                );
+            }
+        }
+        Ok(())
+    }
+
+    async fn close(&mut self) {
+        info!("Stdout sink with ID: {} is shutting down", self.id);
+    }
+}
diff --git a/core/connectors/sources/test_source/Cargo.toml 
b/core/connectors/sources/test_source/Cargo.toml
new file mode 100644
index 00000000..f1faec68
--- /dev/null
+++ b/core/connectors/sources/test_source/Cargo.toml
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_test_source"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[lib]
+crate-type = ["cdylib"]
+
+[dependencies]
+async-trait = { workspace = true }
+dashmap = { workspace = true }
+humantime = { workspace = true }
+iggy_connector_sdk = { workspace = true }
+once_cell = { workspace = true }
+rand = { workspace = true }
+serde = { workspace = true }
+simd-json = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
+
+[package.metadata.cargo-machete]
+ignored = ["dashmap", "once_cell"]
diff --git a/core/connectors/sources/test_source/src/lib.rs 
b/core/connectors/sources/test_source/src/lib.rs
new file mode 100644
index 00000000..9d8c4cf1
--- /dev/null
+++ b/core/connectors/sources/test_source/src/lib.rs
@@ -0,0 +1,157 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::{str::FromStr, time::Duration};
+
+use async_trait::async_trait;
+use iggy_connector_sdk::{ProducedMessage, ProducedMessages, Schema, Source, 
source_connector};
+use rand::{
+    Rng,
+    distr::{Alphanumeric, Uniform},
+};
+use serde::{Deserialize, Serialize};
+use tokio::{sync::Mutex, time::sleep};
+use tracing::info;
+
+source_connector!(TestSource);
+
+#[derive(Debug)]
+pub struct TestSource {
+    id: u32,
+    max_count: Option<usize>,
+    interval: Duration,
+    messages_range: (u32, u32),
+    payload_size: u32,
+    state: Mutex<State>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct TestSourceConfig {
+    interval: Option<String>,
+    max_count: Option<usize>,
+    messages_range: Option<(u32, u32)>,
+    payload_size: Option<u32>,
+}
+
+#[derive(Debug)]
+struct State {
+    current_number: usize,
+}
+
+impl TestSource {
+    pub fn new(id: u32, config: TestSourceConfig) -> Self {
+        let interval = config.interval.unwrap_or("1s".to_string());
+        let interval = humantime::Duration::from_str(&interval)
+            .unwrap_or(humantime::Duration::from_str("1s").unwrap());
+        TestSource {
+            id,
+            max_count: config.max_count,
+            interval: *interval,
+            messages_range: config.messages_range.unwrap_or((10, 50)),
+            payload_size: config.payload_size.unwrap_or(100),
+            state: Mutex::new(State { current_number: 0 }),
+        }
+    }
+
+    fn generate_messages(&self) -> Vec<ProducedMessage> {
+        let mut messages = Vec::new();
+        let mut rng = rand::rng();
+        let messages_count =
+            rng.sample(Uniform::new(self.messages_range.0, 
self.messages_range.1).unwrap());
+        for _ in 0..messages_count {
+            let record = Record {
+                title: "Hello".to_string(),
+                name: "World".to_string(),
+                age: 30,
+                text: self.generate_random_text(),
+            };
+            let message = ProducedMessage {
+                id: None,
+                headers: None,
+                payload: simd_json::to_vec(&record).unwrap(),
+            };
+            messages.push(message);
+        }
+        messages
+    }
+
+    fn generate_random_text(&self) -> String {
+        let mut rng = rand::rng();
+        let text: String = (0..self.payload_size)
+            .map(|_| rng.sample(Alphanumeric) as char)
+            .collect();
+        text
+    }
+}
+
+#[async_trait]
+impl Source for TestSource {
+    async fn open(&mut self) -> Result<(), iggy_connector_sdk::Error> {
+        info!(
+            "Initialized test source with ID {}. Interval: {:#?}, max offset: 
{:#?}, messages range: {} - {}, payload size: {}",
+            self.id,
+            self.interval,
+            self.max_count,
+            self.messages_range.0,
+            self.messages_range.1,
+            self.payload_size
+        );
+        Ok(())
+    }
+
+    async fn poll(&self) -> Result<ProducedMessages, 
iggy_connector_sdk::Error> {
+        sleep(self.interval).await;
+        let mut state = self.state.lock().await;
+        if let Some(max_count) = self.max_count {
+            if state.current_number >= max_count {
+                info!(
+                    "Reached max number of {max_count} messages for test 
source with ID {}",
+                    self.id
+                );
+                return Ok(ProducedMessages {
+                    schema: Schema::Json,
+                    messages: vec![],
+                });
+            }
+        }
+
+        let messages = self.generate_messages();
+        state.current_number += messages.len();
+        info!(
+            "Generated {} messages by test source with ID {}",
+            messages.len(),
+            self.id
+        );
+        Ok(ProducedMessages {
+            schema: Schema::Json,
+            messages,
+        })
+    }
+
+    async fn close(&mut self) {
+        info!("Test source with ID {} is shutting down", self.id);
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct Record {
+    title: String,
+    name: String,
+    age: u32,
+    text: String,
+}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index e507ed47..0b4ee673 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -128,3 +128,6 @@ serial_test = { workspace = true }
 [[bin]]
 name = "iggy-server"
 path = "src/main.rs"
+
+[package.metadata.cargo-machete]
+ignored = ["rust-s3"]


Reply via email to