This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch connectors
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c32631cf51571ceba0f81ecec0f84391c43a4923
Author: spetz <[email protected]>
AuthorDate: Sat May 24 21:11:54 2025 +0200

    feat(repo): add connectors runtime
---
 Cargo.lock                                         | 372 +++++++++++++++++++++
 Cargo.toml                                         |  21 +-
 core/connectors/README.md                          |  23 ++
 core/connectors/data_producer/Cargo.toml           |  48 +++
 core/connectors/data_producer/src/main.rs          | 146 ++++++++
 core/connectors/docker-compose.yml                 |  38 +++
 core/connectors/runtime/Cargo.toml                 |  50 +++
 core/connectors/runtime/config.toml                | 131 ++++++++
 core/connectors/runtime/src/main.rs                | 361 ++++++++++++++++++++
 core/connectors/runtime/src/sink.rs                | 344 +++++++++++++++++++
 core/connectors/runtime/src/source.rs              | 299 +++++++++++++++++
 core/connectors/runtime/src/transform.rs           |  66 ++++
 core/connectors/sdk/Cargo.toml                     |  48 +++
 core/connectors/sdk/src/decoders/json.rs           |  37 ++
 core/connectors/sdk/src/decoders/mod.rs            |  21 ++
 core/connectors/sdk/src/decoders/raw.rs            |  31 ++
 core/connectors/sdk/src/decoders/text.rs           |  37 ++
 core/connectors/sdk/src/encoders/json.rs           |  39 +++
 core/connectors/sdk/src/encoders/mod.rs            |  21 ++
 core/connectors/sdk/src/encoders/raw.rs            |  34 ++
 core/connectors/sdk/src/encoders/text.rs           |  37 ++
 core/connectors/sdk/src/lib.rs                     | 239 +++++++++++++
 core/connectors/sdk/src/sink.rs                    | 240 +++++++++++++
 core/connectors/sdk/src/source.rs                  | 218 ++++++++++++
 core/connectors/sdk/src/transforms/add_fields.rs   | 128 +++++++
 .../connectors/sdk/src/transforms/delete_fields.rs |  61 ++++
 core/connectors/sdk/src/transforms/mod.rs          |  45 +++
 core/connectors/sinks/quickwit_sink/Cargo.toml     |  45 +++
 core/connectors/sinks/quickwit_sink/src/lib.rs     | 209 ++++++++++++
 core/connectors/sinks/stdout_sink/Cargo.toml       |  42 +++
 core/connectors/sinks/stdout_sink/src/lib.rs       |  88 +++++
 core/connectors/sources/test_source/Cargo.toml     |  47 +++
 core/connectors/sources/test_source/src/lib.rs     | 157 +++++++++
 33 files changed, 3720 insertions(+), 3 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 96dee5e0..972240a7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -420,6 +420,12 @@ version = "1.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
 
+[[package]]
+name = "arraydeque"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236"
+
 [[package]]
 name = "arrayref"
 version = "0.3.9"
@@ -595,6 +601,15 @@ dependencies = [
  "bytemuck",
 ]
 
+[[package]]
+name = "atomic-polyfill"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8cf2bce30dfe09ef0bfaef228b9d414faaf7e563035494d7fe092dba54b300f4"
+dependencies = [
+ "critical-section",
+]
+
 [[package]]
 name = "atomic-waker"
 version = "1.1.2"
@@ -1012,6 +1027,9 @@ name = "bitflags"
 version = "2.9.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
+dependencies = [
+ "serde",
+]
 
 [[package]]
 name = "bitvec"
@@ -1406,6 +1424,12 @@ dependencies = [
  "cc",
 ]
 
+[[package]]
+name = "cobs"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15"
+
 [[package]]
 name = "colorchoice"
 version = "1.0.3"
@@ -1451,6 +1475,25 @@ dependencies = [
  "crossbeam-utils",
 ]
 
+[[package]]
+name = "config"
+version = "0.15.11"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "595aae20e65c3be792d05818e8c63025294ac3cb7e200f11459063a352a6ef80"
+dependencies = [
+ "async-trait",
+ "convert_case 0.6.0",
+ "json5",
+ "pathdiff",
+ "ron",
+ "rust-ini",
+ "serde",
+ "serde_json",
+ "toml",
+ "winnow 0.7.10",
+ "yaml-rust2",
+]
+
 [[package]]
 name = "console-api"
 version = "0.8.1"
@@ -1620,6 +1663,12 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "critical-section"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b"
+
 [[package]]
 name = "crossbeam"
 version = "0.8.4"
@@ -1978,6 +2027,29 @@ dependencies = [
  "syn 2.0.101",
 ]
 
+[[package]]
+name = "dlopen2"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9e1297103d2bbaea85724fcee6294c2d50b1081f9ad47d0f6f6f61eda65315a6"
+dependencies = [
+ "dlopen2_derive",
+ "libc",
+ "once_cell",
+ "winapi",
+]
+
+[[package]]
+name = "dlopen2_derive"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f2b99bf03862d7f545ebc28ddd33a665b50865f4dfd84031a393823879bd4c54"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.101",
+]
+
 [[package]]
 name = "dlv-list"
 version = "0.5.2"
@@ -2038,6 +2110,18 @@ version = "1.15.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
 
+[[package]]
+name = "embedded-io"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ef1a6892d9eef45c8fa6b9e0086428a2cca8491aca8f787c534a3d6d0bcb3ced"
+
+[[package]]
+name = "embedded-io"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d"
+
 [[package]]
 name = "encoding_rs"
 version = "0.8.35"
@@ -3016,6 +3100,16 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "halfbrown"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "aa2c385c6df70fd180bbb673d93039dbd2cd34e41d782600bdf6e1ca7bce39aa"
+dependencies = [
+ "hashbrown 0.15.3",
+ "serde",
+]
+
 [[package]]
 name = "handlebars"
 version = "4.5.0"
@@ -3030,6 +3124,15 @@ dependencies = [
  "thiserror 1.0.69",
 ]
 
+[[package]]
+name = "hash32"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67"
+dependencies = [
+ "byteorder",
+]
+
 [[package]]
 name = "hashbrown"
 version = "0.12.3"
@@ -3056,6 +3159,15 @@ dependencies = [
  "foldhash",
 ]
 
+[[package]]
+name = "hashlink"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
+dependencies = [
+ "hashbrown 0.15.3",
+]
+
 [[package]]
 name = "hdrhistogram"
 version = "7.5.4"
@@ -3069,6 +3181,20 @@ dependencies = [
  "num-traits",
 ]
 
+[[package]]
+name = "heapless"
+version = "0.7.17"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f"
+dependencies = [
+ "atomic-polyfill",
+ "hash32",
+ "rustc_version",
+ "serde",
+ "spin",
+ "stable_deref_trait",
+]
+
 [[package]]
 name = "heck"
 version = "0.5.0"
@@ -3544,6 +3670,127 @@ dependencies = [
  "tracing",
 ]
 
+[[package]]
+name = "iggy_connector_data_producer"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "chrono",
+ "dashmap",
+ "iggy",
+ "once_cell",
+ "postcard",
+ "rand 0.9.1",
+ "reqwest",
+ "serde",
+ "serde_json",
+ "serde_yml",
+ "simd-json",
+ "thiserror 2.0.12",
+ "tokio",
+ "toml",
+ "tracing",
+ "tracing-subscriber",
+]
+
+[[package]]
+name = "iggy_connector_quickwit_sink"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "dashmap",
+ "iggy_connector_sdk",
+ "once_cell",
+ "postcard",
+ "reqwest",
+ "serde",
+ "serde_yml",
+ "simd-json",
+ "toml",
+ "tracing",
+]
+
+[[package]]
+name = "iggy_connector_runtime"
+version = "0.1.0"
+dependencies = [
+ "config",
+ "dashmap",
+ "dlopen2",
+ "flume",
+ "futures",
+ "iggy",
+ "iggy_binary_protocol",
+ "iggy_connector_sdk",
+ "mimalloc",
+ "once_cell",
+ "postcard",
+ "rand 0.9.1",
+ "serde",
+ "serde_json",
+ "simd-json",
+ "thiserror 2.0.12",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+]
+
+[[package]]
+name = "iggy_connector_sdk"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "chrono",
+ "dashmap",
+ "flume",
+ "iggy",
+ "iggy_binary_protocol",
+ "once_cell",
+ "postcard",
+ "serde",
+ "serde_json",
+ "simd-json",
+ "strum_macros",
+ "thiserror 2.0.12",
+ "tokio",
+ "tracing",
+ "tracing-subscriber",
+ "uuid",
+]
+
+[[package]]
+name = "iggy_connector_stdout_sink"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "dashmap",
+ "iggy_connector_sdk",
+ "once_cell",
+ "postcard",
+ "serde",
+ "toml",
+ "tracing",
+]
+
+[[package]]
+name = "iggy_connector_test_source"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "dashmap",
+ "flume",
+ "humantime",
+ "iggy_connector_sdk",
+ "once_cell",
+ "postcard",
+ "rand 0.9.1",
+ "serde",
+ "simd-json",
+ "tokio",
+ "toml",
+ "tracing",
+]
+
 [[package]]
 name = "iggy_examples"
 version = "0.0.5"
@@ -3809,6 +4056,17 @@ dependencies = [
  "wasm-bindgen",
 ]
 
+[[package]]
+name = "json5"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1"
+dependencies = [
+ "pest",
+ "pest_derive",
+ "serde",
+]
+
 [[package]]
 name = "jsonwebtoken"
 version = "9.3.1"
@@ -3983,6 +4241,16 @@ dependencies = [
  "vcpkg",
 ]
 
+[[package]]
+name = "libyml"
+version = "0.0.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3302702afa434ffa30847a83305f0a69d6abd74293b6554c18ec85c7ef30c980"
+dependencies = [
+ "anyhow",
+ "version_check",
+]
+
 [[package]]
 name = "libz-sys"
 version = "1.1.22"
@@ -4815,6 +5083,12 @@ version = "1.0.15"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
 
+[[package]]
+name = "pathdiff"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3"
+
 [[package]]
 name = "pbkdf2"
 version = "0.12.2"
@@ -4991,6 +5265,19 @@ dependencies = [
  "portable-atomic",
 ]
 
+[[package]]
+name = "postcard"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "170a2601f67cc9dba8edd8c4870b15f71a6a2dc196daec8c83f72b59dff628a8"
+dependencies = [
+ "cobs",
+ "embedded-io 0.4.0",
+ "embedded-io 0.6.1",
+ "heapless",
+ "serde",
+]
+
 [[package]]
 name = "potential_utf"
 version = "0.1.2"
@@ -5445,6 +5732,26 @@ dependencies = [
  "thiserror 2.0.12",
 ]
 
+[[package]]
+name = "ref-cast"
+version = "1.0.24"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4a0ae411dbe946a674d89546582cea4ba2bb8defac896622d6496f14c23ba5cf"
+dependencies = [
+ "ref-cast-impl",
+]
+
+[[package]]
+name = "ref-cast-impl"
+version = "1.0.24"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1165225c21bff1f3bbce98f5a1f889949bc902d3575308cc7b0de30b4f6d27c7"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.101",
+]
+
 [[package]]
 name = "regex"
 version = "1.11.1"
@@ -5637,6 +5944,18 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "ron"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94"
+dependencies = [
+ "base64 0.21.7",
+ "bitflags 2.9.0",
+ "serde",
+ "serde_derive",
+]
+
 [[package]]
 name = "route-recognizer"
 version = "0.3.1"
@@ -6104,6 +6423,21 @@ dependencies = [
  "syn 2.0.101",
 ]
 
+[[package]]
+name = "serde_yml"
+version = "0.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "59e2dd588bf1597a252c3b920e0143eb99b0f76e4e082f4c92ce34fbc9e71ddd"
+dependencies = [
+ "indexmap 2.9.0",
+ "itoa",
+ "libyml",
+ "memchr",
+ "ryu",
+ "serde",
+ "version_check",
+]
+
 [[package]]
 name = "serial_test"
 version = "3.2.0"
@@ -6253,6 +6587,21 @@ version = "0.3.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
 
+[[package]]
+name = "simd-json"
+version = "0.15.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c962f626b54771990066e5435ec8331d1462576cd2d1e62f24076ae014f92112"
+dependencies = [
+ "getrandom 0.3.3",
+ "halfbrown",
+ "ref-cast",
+ "serde",
+ "serde_json",
+ "simdutf8",
+ "value-trait",
+]
+
 [[package]]
 name = "simdutf8"
 version = "0.1.5"
@@ -7169,6 +7518,18 @@ version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
 
+[[package]]
+name = "value-trait"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0508fce11ad19e0aab49ce20b6bec7f8f82902ded31df1c9fc61b90f0eb396b8"
+dependencies = [
+ "float-cmp",
+ "halfbrown",
+ "itoa",
+ "ryu",
+]
+
 [[package]]
 name = "vcpkg"
 version = "0.2.15"
@@ -8036,6 +8397,17 @@ dependencies = [
  "lzma-sys",
 ]
 
+[[package]]
+name = "yaml-rust2"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "18b783b2c2789414f8bb84ca3318fc9c2d7e7be1c22907d37839a58dedb369d3"
+dependencies = [
+ "arraydeque",
+ "encoding_rs",
+ "hashlink",
+]
+
 [[package]]
 name = "yansi"
 version = "1.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index 97035f30..3a4984f1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,6 +30,12 @@ members = [
     "core/binary_protocol",
     "core/cli",
     "core/common",
+    "core/connectors/data_producer",
+    "core/connectors/runtime",
+    "core/connectors/sdk",
+    "core/connectors/sinks/quickwit_sink",
+    "core/connectors/sinks/stdout_sink",
+    "core/connectors/sources/test_source",
     "core/examples",
     "core/integration",
     "core/sdk",
@@ -68,16 +74,18 @@ byte-unit = { version = "5.1.6", default-features = false, 
features = [
 ] }
 bytes = "1.10.1"
 charming = "0.4.0"
-chrono = "0.4.41"
+chrono = { version = "0.4.41", features = ["serde"] }
 clap = { version = "4.5.37", features = ["derive"] }
+config = { version = "0.15.11" }
 comfy-table = "7.1.4"
 crc32fast = "1.4.2"
 crossbeam = "0.8.4"
 dashmap = "6.1.0"
+derive_builder = "0.20.2"
 derive_more = { version = "2.0.1", features = ["full"] }
 derive-new = "0.7.0"
 dirs = "6.0.0"
-derive_builder = "0.20.2"
+dlopen2 = "0.7.0"
 enum_dispatch = "0.3.13"
 figlet-rs = "0.1.5"
 flume = "0.11.1"
@@ -87,9 +95,11 @@ human-repr = "1.1.0"
 humantime = "2.2.0"
 keyring = { version = "3.6.2", features = ["sync-secret-service", "vendored"] }
 nonzero_lit = "0.1.2"
+once_cell = "1.21.3"
 openssl = { version = "0.10.72", features = ["vendored"] }
 passterm = "=2.0.1"
 quinn = "0.11.8"
+postcard = { version = "1.1.1", features = ["alloc"] }
 rand = "0.9.1"
 reqwest = { version = "0.12.15", default-features = false, features = [
     "json",
@@ -101,7 +111,9 @@ rustls = { version = "0.23.27", features = ["ring"] }
 serde = { version = "1.0.219", features = ["derive", "rc"] }
 serde_json = "1.0.140"
 serde_with = { version = "3.12.0", features = ["base64", "macros"] }
+serde_yml = "0.0.12"
 serial_test = "3.2.0"
+simd-json = { version = "0.15.1", features = ["serde_impl"] }
 sysinfo = "0.35.0"
 tempfile = "3.19.1"
 thiserror = "2.0.12"
@@ -116,6 +128,7 @@ tracing-subscriber = { version = "0.3.19", default-features 
= false, features =
     "ansi",
 ] }
 uuid = { version = "1.16.0", features = [
+    "v4",
     "v7",
     "fast-rng",
     "serde",
@@ -123,6 +136,7 @@ uuid = { version = "1.16.0", features = [
 ] }
 rust-s3 = { version = "0.35.1", features = ["default"] }
 strum = { version = "0.27.1", features = ["derive"] }
+strum_macros = "0.27.1"
 aes-gcm = "0.10.3"
 base64 = "0.22.1"
 twox-hash = { version = "2.1.0", features = ["xxhash32"] }
@@ -151,8 +165,9 @@ mimalloc = "0.1"
 console-subscriber = "0.4.1"
 
 # Path dependencies
-iggy_common = { path = "core/common", version = "0.7.0" }
 iggy_binary_protocol = { path = "core/binary_protocol", version = "0.7.0" }
+iggy_common = { path = "core/common", version = "0.7.0" }
+iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.0" }
 iggy = { path = "core/sdk", version = "0.7.0" }
 server = { path = "core/server" }
 integration = { path = "core/integration" }
diff --git a/core/connectors/README.md b/core/connectors/README.md
new file mode 100644
index 00000000..0b362a55
--- /dev/null
+++ b/core/connectors/README.md
@@ -0,0 +1,23 @@
+# Apache Iggy Connectors
+
+Readme is WiP, as of now you can do the following to test the connectors:
+
+1. Build the project in release mode, and make sure that the plugins specified 
in `core/connectors/config.toml` under `path` are available. BTW you can use 
either of `toml`, `json` or `yaml` formats.
+
+2. Run `docker compose up -d` from `/core/connectors` which will start the 
Quickwit server to be used by an example sink connector.
+
+3. Set environment variable 
`IGGY_CONNECTORS_RUNTIME_CONFIG_PATH=core/connectors/runtime/config` pointing 
to the runtime configuration file.
+
+4. Start the Iggy server and invoke the following commands to create the 
example streams and topics used by the connectors.
+
+```
+iggy --username iggy --password iggy stream create example
+iggy --username iggy --password iggy stream create qw
+iggy --username iggy --password iggy topic create qw records 1 none 1d
+```
+
+5. Execute `cargo r --bin iggy_connector_data_producer -r` which will start 
the example data producer application, sending the messages to previously 
created `qw` stream and `records` topic.
+
+6. Start the connector runtime `cargo r --bin iggy_connector_runtime -r` - you 
should be able to browse Quickwit UI at `http://localhost:7280` with records 
being constantly added to the `events` index (this is part of the Quickwit 
sink). At the same time, you should see the new messages being added to the 
`example` stream and `topic1` topic by the test source connector - you can use 
Iggy Web UI to browse the data.
+
+New connector can be built simply by implementing either `Sink` or `Source` 
trait. Please check the existing examples under `core/connectors/sinks` and 
`core/connectors/sources` directories.
diff --git a/core/connectors/data_producer/Cargo.toml 
b/core/connectors/data_producer/Cargo.toml
new file mode 100644
index 00000000..9789b88d
--- /dev/null
+++ b/core/connectors/data_producer/Cargo.toml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_data_producer"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[dependencies]
+iggy = { workspace = true }
+async-trait = { workspace = true }
+chrono = { workspace = true }
+dashmap = { workspace = true }
+once_cell = { workspace = true }
+serde = { workspace = true }
+postcard = { workspace = true }
+rand = { workspace = true }
+reqwest = { workspace = true }
+serde_yml = { workspace = true }
+serde_json = { workspace = true }
+simd-json = { workspace = true }
+thiserror = { workspace = true }
+toml = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
diff --git a/core/connectors/data_producer/src/main.rs 
b/core/connectors/data_producer/src/main.rs
new file mode 100644
index 00000000..2607d955
--- /dev/null
+++ b/core/connectors/data_producer/src/main.rs
@@ -0,0 +1,146 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::{env, str::FromStr, time::Duration};
+
+use chrono::{DateTime, Days, Utc};
+use iggy::prelude::{
+    Client, IggyClient, IggyClientBuilder, IggyDuration, IggyError, 
IggyMessage, Partitioning,
+};
+use rand::{
+    Rng,
+    distr::{Alphanumeric, Uniform},
+};
+use serde::{Deserialize, Serialize};
+use thiserror::Error;
+use tokio::time::sleep;
+use tracing::info;
+use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, 
util::SubscriberInitExt};
+
+const SOURCES: [&str; 6] = ["browser", "mobile", "desktop", "email", 
"network", "other"];
+const STATES: [&str; 5] = ["active", "inactive", "blocked", "deleted", 
"unknown"];
+const DOMAINS: [&str; 5] = [
+    "gmail.com",
+    "yahoo.com",
+    "hotmail.com",
+    "outlook.com",
+    "aol.com",
+];
+
+#[tokio::main]
+async fn main() -> Result<(), DataProducerError> {
+    Registry::default()
+        .with(tracing_subscriber::fmt::layer())
+        
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
+        .init();
+    info!("Starting data producer...");
+    let address = 
env::var("IGGY_ADDRESS").unwrap_or("localhost:8090".to_owned());
+    let username = env::var("IGGY_USERNAME").unwrap_or("iggy".to_owned());
+    let password = env::var("IGGY_PASSWORD").unwrap_or("iggy".to_owned());
+    let stream = env::var("IGGY_STREAM").unwrap_or("qw".to_owned());
+    let topic = env::var("IGGY_TOPIC").unwrap_or("records".to_owned());
+    let client = create_client(&address, &username, &password).await?;
+    let mut producer = client
+        .producer(&stream, &topic)?
+        .batch_size(1000)
+        .send_interval(IggyDuration::from_str("5ms").unwrap())
+        .partitioning(Partitioning::balanced())
+        .build();
+    producer.init().await?;
+
+    let mut rng = rand::rng();
+    let mut batches_count = 0;
+    while batches_count < 100000 {
+        let records_count = rng.sample(Uniform::new(500u32, 1000).unwrap());
+        let messages = (0..records_count)
+            .map(|_| random_record())
+            .flat_map(|record| serde_json::to_string(&record).ok())
+            .flat_map(|payload| IggyMessage::from_str(&payload).ok())
+            .collect::<Vec<_>>();
+        producer.send(messages).await?;
+        info!("Sent {records_count} messages");
+        sleep(Duration::from_millis(10)).await;
+        batches_count += 1;
+    }
+
+    info!("Reached maximum batches count");
+    Ok(())
+}
+
+async fn create_client(
+    address: &str,
+    username: &str,
+    password: &str,
+) -> Result<IggyClient, IggyError> {
+    let connection_string = format!("iggy://{username}:{password}@{address}");
+    let client = 
IggyClientBuilder::from_connection_string(&connection_string)?.build()?;
+    client.connect().await?;
+    Ok(client)
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct Record {
+    user_id: String,
+    user_type: u8,
+    email: String,
+    source: String,
+    state: String,
+    created_at: DateTime<Utc>,
+    message: String,
+}
+
+fn random_record() -> Record {
+    let mut rng = rand::rng();
+    let source =
+        SOURCES[rng.sample(Uniform::new(0u8, SOURCES.len() as u8).unwrap()) as 
usize].to_owned();
+    let state =
+        STATES[rng.sample(Uniform::new(0u8, STATES.len() as u8).unwrap()) as 
usize].to_owned();
+    let email = format!(
+        "{}@{}",
+        random_string(rng.sample(Uniform::new(3u32, 20).unwrap()) as usize),
+        DOMAINS[rng.sample(Uniform::new(0u8, DOMAINS.len() as u8).unwrap()) as 
usize]
+    );
+    let created_at = Utc::now()
+        .checked_sub_days(Days::new(rng.sample(Uniform::new(0u64, 
1000).unwrap())))
+        .unwrap();
+    Record {
+        user_id: format!("user_{}", rng.sample(Uniform::new(1u32, 
100).unwrap())),
+        user_type: rng.sample(Uniform::new(1u8, 5).unwrap()),
+        email,
+        source,
+        state,
+        message: random_string(rng.sample(Uniform::new(10u32, 100).unwrap()) 
as usize),
+        created_at,
+    }
+}
+
+fn random_string(size: usize) -> String {
+    let mut rng = rand::rng();
+    let text: String = (0..size)
+        .map(|_| rng.sample(Alphanumeric) as char)
+        .collect();
+    text
+}
+
+#[derive(Debug, Error)]
+enum DataProducerError {
+    #[error("Iggy client error")]
+    IggyClient(#[from] iggy::prelude::ClientError),
+    #[error("Iggy error")]
+    IggyError(#[from] iggy::prelude::IggyError),
+}
diff --git a/core/connectors/docker-compose.yml 
b/core/connectors/docker-compose.yml
new file mode 100644
index 00000000..5657e42e
--- /dev/null
+++ b/core/connectors/docker-compose.yml
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+services:
+  quickwit:
+    image: quickwit/quickwit:edge
+    container_name: quickwit
+    restart: unless-stopped
+    volumes:
+      - quickwit:/quickwit/qwdata
+    ports:
+      - 7280:7280
+      - 7281:7281
+    command: run
+    environment:
+      - QW_ENABLE_OTLP_ENDPOINT=true
+      - QW_ENABLE_JAEGER_ENDPOINT=true
+      - QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER=true
+      - QW_ENABLE_INGEST_V2=true
+      - OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:7281
+      - 
RUST_LOG=info,quickwit_actors=error,indexing_split_store=error,tantivy=error,quickwit_serve=warn,quickwit_indexing=warn,quickwit=warn
+
+volumes:
+  quickwit:
diff --git a/core/connectors/runtime/Cargo.toml 
b/core/connectors/runtime/Cargo.toml
new file mode 100644
index 00000000..9e10a261
--- /dev/null
+++ b/core/connectors/runtime/Cargo.toml
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_runtime"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[dependencies]
+config = { workspace = true }
+dashmap = { workspace = true }
+dlopen2 = { workspace = true }
+flume = { workspace = true }
+futures = { workspace = true }
+iggy = { workspace = true }
+iggy_binary_protocol = { workspace = true }
+iggy_connector_sdk = { workspace = true }
+mimalloc = { workspace = true }
+once_cell = { workspace = true }
+postcard = { workspace = true }
+rand = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+simd-json = { workspace = true }
+thiserror = { workspace = true }
+tokio = { workspace = true }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
diff --git a/core/connectors/runtime/config.toml 
b/core/connectors/runtime/config.toml
new file mode 100644
index 00000000..618120dd
--- /dev/null
+++ b/core/connectors/runtime/config.toml
@@ -0,0 +1,131 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[iggy]
+address = "localhost:8090"
+username = "iggy"
+password = "iggy"
+token = "secret"
+
+[sources.test1]
+enabled = true
+name = "Test source"
+path = "target/release/libiggy_connector_test_source"
+
+[[sources.test1.streams]]
+stream = "example"
+topic = "topic1"
+schema = "json"
+batch_size = 1000
+send_interval = "5ms"
+
+[sources.test1.config]
+interval = "100ms"
+# max_count = 1000
+messages_range = [10, 50]
+payload_size = 200
+
+[sources.test1.transforms.add_fields]
+enabled = true
+
+[[sources.test1.transforms.add_fields.fields]]
+key = "test_field"
+value.static = "hello!"
+
+[sinks.quickwit]
+enabled = true
+name = "Quickwit sink 1"
+path = "target/release/libiggy_connector_quickwit_sink"
+
+[[sinks.quickwit.streams]]
+stream = "qw"
+topics = ["records"]
+schema = "json"
+batch_size = 1000
+poll_interval = "5ms"
+consumer_group = "qw_sink_connector"
+
+[sinks.quickwit.transforms.add_fields]
+enabled = true
+
+[[sinks.quickwit.transforms.add_fields.fields]]
+key = "service_name"
+value.static = "qw_connector"
+
+[[sinks.quickwit.transforms.add_fields.fields]]
+key = "timestamp"
+value.computed = "timestamp_millis"
+
+[[sinks.quickwit.ransforms.add_fields.fields]]
+key = "random_id"
+value.computed = "uuid_v7"
+
+[sinks.quickwit.transforms.delete_fields]
+enabled = true
+fields = ["email", "created_at"]
+
+[sinks.quickwit.config]
+url = "http://localhost:7280";
+index = """
+version: 0.9
+
+index_id: events
+
+doc_mapping:
+  mode: strict
+  field_mappings:
+    - name: timestamp
+      type: datetime
+      input_formats: [unix_timestamp]
+      output_format: unix_timestamp_nanos
+      indexed: false
+      fast: true
+      fast_precision: milliseconds
+    - name: service_name
+      type: text
+      tokenizer: raw
+      fast: true
+    - name: random_id
+      type: text
+      tokenizer: raw
+      fast: true
+    - name: user_id
+      type: text
+      tokenizer: raw
+      fast: true
+    - name: user_type
+      type: u64
+      fast: true
+    - name: source
+      type: text
+      tokenizer: default
+    - name: state
+      type: text
+      tokenizer: default
+    - name: message
+      type: text
+      tokenizer: default
+
+  timestamp_field: timestamp
+
+indexing_settings:
+  commit_timeout_secs: 10
+
+retention:
+  period: 7 days
+  schedule: daily
+"""
diff --git a/core/connectors/runtime/src/main.rs 
b/core/connectors/runtime/src/main.rs
new file mode 100644
index 00000000..99fe9eb6
--- /dev/null
+++ b/core/connectors/runtime/src/main.rs
@@ -0,0 +1,361 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use config::{Config, Environment, File};
+use dlopen2::wrapper::{Container, WrapperApi};
+use iggy::prelude::{Client, IggyClient, IggyClientBuilder, IggyConsumer, 
IggyError, IggyProducer};
+use iggy_connector_sdk::{
+    Schema, StreamDecoder, StreamEncoder,
+    sink::ConsumeCallback,
+    source::{HandleCallback, SendCallback},
+    transforms::{Transform, TransformType},
+};
+use mimalloc::MiMalloc;
+use serde::{Deserialize, Serialize};
+use std::{
+    collections::HashMap,
+    env,
+    sync::{Arc, atomic::AtomicU32},
+};
+use thiserror::Error;
+use tracing::{error, info};
+use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, 
util::SubscriberInitExt};
+
+mod sink;
+mod source;
+mod transform;
+
+#[global_allocator]
+static GLOBAL: MiMalloc = MiMalloc;
+
+#[derive(WrapperApi)]
+pub struct SinkApi {
+    open: extern "C" fn(id: u32, config_ptr: *const u8, config_len: usize) -> 
i32,
+    #[allow(clippy::too_many_arguments)]
+    consume: extern "C" fn(
+        id: u32,
+        topic_meta_ptr: *const u8,
+        topic_meta_len: usize,
+        messages_meta_ptr: *const u8,
+        messages_meta_len: usize,
+        messages_ptr: *const u8,
+        messages_len: usize,
+    ) -> i32,
+    close: extern "C" fn(id: u32) -> i32,
+}
+
+#[derive(WrapperApi)]
+struct SourceApi {
+    open: extern "C" fn(id: u32, config_ptr: *const u8, config_len: usize) -> 
i32,
+    handle: extern "C" fn(id: u32, callback: SendCallback) -> i32,
+    close: extern "C" fn(id: u32) -> i32,
+}
+
+static PLUGIN_ID: AtomicU32 = AtomicU32::new(1);
+
+struct SinkConnector {
+    container: Container<SinkApi>,
+    plugins: Vec<SinkConnectorPlugin>,
+}
+
+struct SinkConnectorPlugin {
+    id: u32,
+    consumers: Vec<SinkConnectorConsumer>,
+}
+
+struct SinkConnectorConsumer {
+    batch_size: u32,
+    consumer: IggyConsumer,
+    decoder: Arc<dyn StreamDecoder>,
+    transforms: Vec<Arc<dyn Transform>>,
+}
+
+struct SinkConnectorWrapper {
+    callback: ConsumeCallback,
+    plugins: Vec<SinkConnectorPlugin>,
+}
+
+struct SinkWithPlugins {
+    container: Container<SinkApi>,
+    plugin_ids: Vec<u32>,
+}
+
+struct SourceConnector {
+    container: Container<SourceApi>,
+    plugins: Vec<SourceConnectorPlugin>,
+}
+
+struct SourceConnectorPlugin {
+    id: u32,
+    transforms: Vec<Arc<dyn Transform>>,
+    producer: Option<SourceConnectorProducer>,
+}
+
+struct SourceConnectorProducer {
+    encoder: Arc<dyn StreamEncoder>,
+    producer: IggyProducer,
+}
+
+struct SourceWithPlugins {
+    container: Container<SourceApi>,
+    plugin_ids: Vec<u32>,
+}
+
+struct SourceConnectorWrapper {
+    callback: HandleCallback,
+    plugins: Vec<SourceConnectorPlugin>,
+}
+
+#[tokio::main]
+async fn main() -> Result<(), RuntimeError> {
+    Registry::default()
+        .with(tracing_subscriber::fmt::layer())
+        
.with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")))
+        .init();
+    let config_path =
+        env::var("IGGY_CONNECTORS_RUNTIME_CONFIG_PATH").unwrap_or_else(|_| 
"config".to_string());
+    info!("Starting Iggy Connector Runtime, loading configuration from: 
{config_path}...");
+    let builder = Config::builder()
+        .add_source(File::with_name(&config_path))
+        .add_source(Environment::with_prefix("IGGY").separator("_"));
+
+    let config: RuntimeConfig = builder
+        .build()
+        .expect("Failed to build config")
+        .try_deserialize()
+        .expect("Failed to deserialize config");
+
+    let iggy_address = config.iggy.address;
+    let iggy_username = config.iggy.username.expect("Iggy username must be 
set");
+    let iggy_password = config.iggy.password.expect("Iggy password must be 
set");
+    let consumer_client = create_client(&iggy_address, &iggy_username, 
&iggy_password).await?;
+    consumer_client.connect().await?;
+    let producer_client = create_client(&iggy_address, &iggy_username, 
&iggy_password).await?;
+    producer_client.connect().await?;
+
+    let sources = source::init(config.sources, &producer_client).await?;
+    let sinks = sink::init(config.sinks, &consumer_client).await?;
+
+    let mut sink_wrappers = vec![];
+    let mut sink_with_plugins = HashMap::new();
+    for (key, sink) in sinks {
+        let plugin_ids = sink.plugins.iter().map(|plugin| plugin.id).collect();
+        sink_wrappers.push(SinkConnectorWrapper {
+            callback: sink.container.consume,
+            plugins: sink.plugins,
+        });
+        sink_with_plugins.insert(
+            key,
+            SinkWithPlugins {
+                container: sink.container,
+                plugin_ids,
+            },
+        );
+    }
+
+    let mut source_wrappers = vec![];
+    let mut source_with_plugins = HashMap::new();
+    for (key, source) in sources {
+        let plugin_ids = source.plugins.iter().map(|plugin| 
plugin.id).collect();
+        source_wrappers.push(SourceConnectorWrapper {
+            callback: source.container.handle,
+            plugins: source.plugins,
+        });
+        source_with_plugins.insert(
+            key,
+            SourceWithPlugins {
+                container: source.container,
+                plugin_ids,
+            },
+        );
+    }
+
+    source::handle(source_wrappers);
+    sink::consume(sink_wrappers);
+    info!("All sources and sinks spawned.");
+
+    #[cfg(unix)]
+    let (mut ctrl_c, mut sigterm) = {
+        use tokio::signal::unix::{SignalKind, signal};
+        (
+            signal(SignalKind::interrupt()).expect("Failed to create SIGINT 
signal"),
+            signal(SignalKind::terminate()).expect("Failed to create SIGTERM 
signal"),
+        )
+    };
+
+    #[cfg(unix)]
+    tokio::select! {
+        _ = ctrl_c.recv() => {
+            info!("Received SIGINT. Shutting down connectors...");
+        },
+        _ = sigterm.recv() => {
+            info!("Received SIGTERM. Shutting down connectors...");
+        }
+    }
+
+    for (key, source) in source_with_plugins {
+        for plugin_id in source.plugin_ids {
+            info!("Closing source connector with ID: {plugin_id} for plugin: 
{key}");
+            source.container.close(plugin_id);
+            info!("Closed source connector with ID: {plugin_id} for plugin: 
{key}");
+        }
+    }
+
+    for (key, sink) in sink_with_plugins {
+        for plugin_id in sink.plugin_ids {
+            info!("Closing sink connector with ID: {plugin_id} for plugin: 
{key}",);
+            sink.container.close(plugin_id);
+            info!("Closed sink connector with ID: {plugin_id} for plugin: 
{key}");
+        }
+    }
+
+    producer_client.shutdown().await?;
+    consumer_client.shutdown().await?;
+
+    info!("All connectors closed.");
+    Ok(())
+}
+
+async fn create_client(
+    address: &str,
+    username: &str,
+    password: &str,
+) -> Result<IggyClient, IggyError> {
+    let connection_string = format!("iggy://{username}:{password}@{address}");
+    let client = 
IggyClientBuilder::from_connection_string(&connection_string)?.build()?;
+    client.connect().await?;
+    Ok(client)
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct Record {
+    title: String,
+    name: String,
+    age: u32,
+    text: String,
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+struct RuntimeConfig {
+    iggy: IggyConfig,
+    sinks: HashMap<String, SinkConfig>,
+    sources: HashMap<String, SourceConfig>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct IggyConfig {
+    address: String,
+    username: Option<String>,
+    password: Option<String>,
+    token: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct SinkConfig {
+    enabled: bool,
+    name: String,
+    path: String,
+    transforms: Option<TransformsConfig>,
+    streams: Vec<StreamConsumerConfig>,
+    config: Option<serde_json::Value>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct StreamConsumerConfig {
+    stream: String,
+    topics: Vec<String>,
+    schema: Schema,
+    batch_size: Option<u32>,
+    poll_interval: Option<String>,
+    consumer_group: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct StreamProducerConfig {
+    stream: String,
+    topic: String,
+    schema: Schema,
+    batch_size: Option<u32>,
+    send_interval: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct SourceConfig {
+    enabled: bool,
+    name: String,
+    path: String,
+    transforms: Option<TransformsConfig>,
+    streams: Vec<StreamProducerConfig>,
+    config: Option<serde_json::Value>,
+}
+
+#[derive(Debug, Default, Serialize, Deserialize)]
+struct TransformsConfig {
+    #[serde(flatten)]
+    transforms: HashMap<TransformType, serde_json::Value>,
+}
+
+#[derive(Debug, Default, Serialize, Deserialize)]
+struct InitialTransformConfig {
+    enabled: bool,
+}
+
+#[derive(Debug, Error)]
+pub enum RuntimeError {
+    #[error("Invalid config")]
+    InvalidConfig,
+    #[error("Invalid record")]
+    InvalidRecord,
+    #[error("Failed to serialize topic metadata")]
+    FailedToSerializeTopicMetadata,
+    #[error("Failed to serialize messages metadata")]
+    FailedToSerializeMessagesMetadata,
+    #[error("Failed to serialize raw messages")]
+    FailedToSerializeRawMessages,
+    #[error("Failed to serialize headers")]
+    FailedToSerializeHeaders,
+    #[error("Invalid transformer")]
+    InvalidTransformer,
+    #[error("Invalid transform")]
+    InvalidTransform,
+    #[error("Invalid sink")]
+    InvalidSink,
+    #[error("Connector SDK error")]
+    ConnectorSdkError(#[from] iggy_connector_sdk::Error),
+    #[error("Iggy client error")]
+    IggyClient(#[from] iggy::prelude::ClientError),
+    #[error("Iggy error")]
+    IggyError(#[from] iggy::prelude::IggyError),
+}
+
+const ALLOWED_PLUGIN_EXTENSIONS: [&str; 3] = ["so", "dylib", "dll"];
+
+pub fn resolve_plugin_path(path: &str) -> String {
+    let extension = path.split('.').next_back().unwrap_or_default();
+    if ALLOWED_PLUGIN_EXTENSIONS.contains(&extension) {
+        path.to_string()
+    } else {
+        let os_extension = match std::env::consts::OS {
+            "windows" => "dll",
+            "macos" => "dylib",
+            _ => "so",
+        };
+
+        format!("{path}.{os_extension}")
+    }
+}
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
new file mode 100644
index 00000000..fd73fd4a
--- /dev/null
+++ b/core/connectors/runtime/src/sink.rs
@@ -0,0 +1,344 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dlopen2::wrapper::Container;
+use futures::StreamExt;
+use iggy::prelude::{
+    AutoCommit, AutoCommitWhen, IggyClient, IggyConsumer, IggyDuration, 
IggyMessage,
+    PollingStrategy,
+};
+use iggy_connector_sdk::{
+    DecodedMessage, MessagesMetadata, RawMessage, RawMessages, 
ReceivedMessage, StreamDecoder,
+    TopicMetadata, sink::ConsumeCallback, transforms::Transform,
+};
+use std::{
+    collections::HashMap,
+    str::FromStr,
+    sync::{Arc, atomic::Ordering},
+    time::Instant,
+};
+use tracing::{error, info, warn};
+
+use crate::{
+    PLUGIN_ID, RuntimeError, SinkApi, SinkConfig, SinkConnector, 
SinkConnectorConsumer,
+    SinkConnectorPlugin, SinkConnectorWrapper, resolve_plugin_path, transform,
+};
+
+pub async fn init(
+    sink_configs: HashMap<String, SinkConfig>,
+    iggy_client: &IggyClient,
+) -> Result<HashMap<String, SinkConnector>, RuntimeError> {
+    let mut sink_connectors: HashMap<String, SinkConnector> = HashMap::new();
+    for (key, config) in sink_configs {
+        let name = config.name;
+        if !config.enabled {
+            warn!("Sink: {name} is disabled ({key})");
+            continue;
+        }
+
+        let plugin_id = PLUGIN_ID.load(Ordering::Relaxed);
+        let path = resolve_plugin_path(&config.path);
+        info!("Initializing sink container with name: {name} ({key}), plugin: 
{path}",);
+        if let Some(container) = sink_connectors.get_mut(&path) {
+            info!("Sink container for plugin: {path} is already loaded.",);
+            init_sink(
+                &container.container,
+                &config.config.unwrap_or_default(),
+                plugin_id,
+            );
+            container.plugins.push(SinkConnectorPlugin {
+                id: plugin_id,
+                consumers: vec![],
+            });
+        } else {
+            let container: Container<SinkApi> =
+                unsafe { Container::load(&path).expect("Failed to load sink 
container") };
+            info!("Sink container for plugin: {path} loaded successfully.",);
+            init_sink(&container, &config.config.unwrap_or_default(), 
plugin_id);
+            sink_connectors.insert(
+                path.to_owned(),
+                SinkConnector {
+                    container,
+                    plugins: vec![SinkConnectorPlugin {
+                        id: plugin_id,
+                        consumers: vec![],
+                    }],
+                },
+            );
+        }
+
+        info!(
+            "Sink container with name: {name} ({key}), initialized 
successfully with ID: {plugin_id}."
+        );
+        PLUGIN_ID.fetch_add(1, Ordering::Relaxed);
+
+        let transforms = if let Some(transforms_config) = config.transforms {
+            let transforms = transform::load(transforms_config).expect("Failed 
to load transforms");
+            let types = transforms
+                .iter()
+                .map(|t| t.r#type().into())
+                .collect::<Vec<&'static str>>()
+                .join(", ");
+            info!("Enabled transforms for sink: {name} ({key}): {types}",);
+            transforms
+        } else {
+            vec![]
+        };
+
+        let connector = sink_connectors
+            .get_mut(&path)
+            .expect("Failed to get sink connector");
+        let plugin = connector
+            .plugins
+            .iter_mut()
+            .find(|p| p.id == plugin_id)
+            .expect("Failed to get sink plugin");
+
+        for stream in config.streams {
+            let poll_interval =
+                
IggyDuration::from_str(&stream.poll_interval.unwrap_or("5ms".to_owned()))
+                    .expect("Invalid poll interval");
+            let consumer_group = stream
+                .consumer_group
+                .unwrap_or(format!("iggy-connect-{key}"));
+            let batch_size = stream.batch_size.unwrap_or(1000);
+            for topic in stream.topics {
+                let mut consumer = iggy_client
+                    .consumer_group(&consumer_group, &stream.stream, &topic)?
+                    
.auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+                    .create_consumer_group_if_not_exists()
+                    .auto_join_consumer_group()
+                    .polling_strategy(PollingStrategy::next())
+                    .poll_interval(poll_interval)
+                    .batch_size(batch_size)
+                    .build();
+
+                consumer.init().await?;
+                plugin.consumers.push(SinkConnectorConsumer {
+                    consumer,
+                    decoder: stream.schema.decoder(),
+                    batch_size,
+                    transforms: transforms.clone(),
+                });
+            }
+        }
+    }
+
+    Ok(sink_connectors)
+}
+
+pub fn consume(sinks: Vec<SinkConnectorWrapper>) {
+    for sink in sinks {
+        for plugin in sink.plugins {
+            info!("Starting consume for sink with ID: {}...", plugin.id);
+            for consumer in plugin.consumers {
+                tokio::spawn(async move {
+                    consume_messages(
+                        plugin.id,
+                        consumer.decoder,
+                        consumer.batch_size,
+                        sink.callback,
+                        consumer.transforms,
+                        consumer.consumer,
+                    )
+                    .await
+                    .expect("Failed to consume messages")
+                });
+            }
+            info!(
+                "Consume for sink with ID: {} started successfully.",
+                plugin.id
+            );
+        }
+    }
+}
+
+async fn consume_messages(
+    plugin_id: u32,
+    decoder: Arc<dyn StreamDecoder>,
+    batch_size: u32,
+    consume: ConsumeCallback,
+    transforms: Vec<Arc<dyn Transform>>,
+    mut consumer: IggyConsumer,
+) -> Result<(), RuntimeError> {
+    info!("Started consuming messages for plugin with ID: {plugin_id}");
+    let batch_size = batch_size as usize;
+
+    let mut batch = Vec::with_capacity(batch_size);
+    let topic_metadata = TopicMetadata {
+        stream: consumer.stream().to_string(),
+        topic: consumer.topic().to_string(),
+    };
+
+    while let Some(message) = consumer.next().await {
+        let Ok(message) = message else {
+            error!("Failed to receive message.");
+            continue;
+        };
+
+        let partition_id = message.partition_id;
+        let current_offset = message.current_offset;
+        let message_offset = message.message.header.offset;
+        batch.push(message.message);
+        if current_offset != message_offset && batch.len() < batch_size {
+            continue;
+        }
+
+        let messages = std::mem::take(&mut batch);
+        let messages_count = messages.len();
+        let messages_metadata = MessagesMetadata {
+            partition_id,
+            current_offset,
+            schema: decoder.schema(),
+        };
+        info!("Processing {messages_count} messages");
+        let start = Instant::now();
+        if let Err(error) = process_messages(
+            plugin_id,
+            messages_metadata,
+            &topic_metadata,
+            messages,
+            &consume,
+            &transforms,
+            &decoder,
+        )
+        .await
+        {
+            error!("Failed to process {messages_count} messages. {error}");
+            continue;
+        }
+
+        let elapsed = start.elapsed();
+        info!("Consumed {messages_count} messages in {:#?}", elapsed);
+    }
+    info!("Stopped consuming messages for plugin with ID: {plugin_id}");
+    Ok(())
+}
+
+async fn process_messages(
+    plugin_id: u32,
+    messages_metadata: MessagesMetadata,
+    topic_metadata: &TopicMetadata,
+    messages: Vec<IggyMessage>,
+    consume: &ConsumeCallback,
+    transforms: &Vec<Arc<dyn Transform>>,
+    decoder: &Arc<dyn StreamDecoder>,
+) -> Result<(), RuntimeError> {
+    let messages = messages.into_iter().map(|message| ReceivedMessage {
+        id: message.header.id,
+        offset: message.header.offset,
+        headers: message.user_headers_map().unwrap_or_default(),
+        payload: message.payload.into(),
+    });
+
+    let count = messages.len();
+    let decoded_messages = messages.into_iter().flat_map(|message| {
+        let Ok(payload) = decoder.decode(message.payload) else {
+            return None;
+        };
+
+        Some(DecodedMessage {
+            id: Some(message.id),
+            offset: Some(message.offset),
+            headers: message.headers,
+            payload,
+        })
+    });
+    let mut raw_messages = Vec::with_capacity(count);
+    for message in decoded_messages {
+        let mut current_message = Some(message);
+        for transform in transforms.iter() {
+            let Some(message) = current_message else {
+                break;
+            };
+
+            current_message = transform.transform(topic_metadata, message)?;
+        }
+
+        // The transform may return no message based on some conditions
+        let Some(message) = current_message else {
+            continue;
+        };
+
+        let Some(offset) = message.offset else {
+            error!("Offset should be present.");
+            continue;
+        };
+
+        let Ok(payload) = message.payload.try_into_vec() else {
+            error!("Failed to get raw payload.");
+            continue;
+        };
+
+        let headers: Result<Vec<u8>, RuntimeError> = if let Some(headers) = 
message.headers {
+            Ok(postcard::to_allocvec(&headers).map_err(|error| {
+                error!("Failed to serialize headers. {error}");
+                RuntimeError::FailedToSerializeHeaders
+            })?)
+        } else {
+            Ok(vec![])
+        };
+
+        let Ok(headers) = headers else {
+            error!("Failed to serialize headers.");
+            continue;
+        };
+
+        raw_messages.push(RawMessage {
+            offset,
+            headers,
+            payload,
+        });
+    }
+
+    let topic_meta = postcard::to_allocvec(topic_metadata).map_err(|error| {
+        error!("Failed to serialize topic metadata. {error}");
+        RuntimeError::FailedToSerializeTopicMetadata
+    })?;
+
+    let messages_meta = 
postcard::to_allocvec(&messages_metadata).map_err(|error| {
+        error!("Failed to serialize messages metadata. {error}");
+        RuntimeError::FailedToSerializeMessagesMetadata
+    })?;
+
+    let messages = postcard::to_allocvec(&RawMessages {
+        schema: decoder.schema(),
+        messages: raw_messages,
+    })
+    .map_err(|error| {
+        error!("Failed to serialize raw messages. {error}");
+        RuntimeError::FailedToSerializeRawMessages
+    })?;
+
+    (consume)(
+        plugin_id,
+        topic_meta.as_ptr(),
+        topic_meta.len(),
+        messages_meta.as_ptr(),
+        messages_meta.len(),
+        messages.as_ptr(),
+        messages.len(),
+    );
+
+    Ok(())
+}
+
+fn init_sink(container: &Container<SinkApi>, config: &serde_json::Value, id: 
u32) {
+    let config = serde_json::to_string(config).expect("Invalid sink config.");
+    (container.open)(id, config.as_ptr(), config.len());
+}
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
new file mode 100644
index 00000000..ac5be012
--- /dev/null
+++ b/core/connectors/runtime/src/source.rs
@@ -0,0 +1,299 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use dashmap::DashMap;
+use dlopen2::wrapper::Container;
+use flume::{Receiver, Sender};
+use iggy::prelude::{HeaderKey, HeaderValue, IggyClient, IggyDuration, 
IggyError, IggyMessage};
+use iggy_connector_sdk::{
+    DecodedMessage, Error, ProducedMessages, StreamEncoder, TopicMetadata, 
transforms::Transform,
+};
+use once_cell::sync::Lazy;
+use std::{
+    collections::HashMap,
+    str::FromStr,
+    sync::{Arc, atomic::Ordering},
+};
+use tracing::{debug, error, info, warn};
+
+use crate::{
+    PLUGIN_ID, RuntimeError, SourceApi, SourceConfig, SourceConnector, 
SourceConnectorPlugin,
+    SourceConnectorProducer, SourceConnectorWrapper, resolve_plugin_path, 
transform,
+};
+
+pub static SOURCE_SENDERS: Lazy<DashMap<u32, Sender<ProducedMessages>>> = 
Lazy::new(DashMap::new);
+
+pub async fn init(
+    source_configs: HashMap<String, SourceConfig>,
+    iggy_client: &IggyClient,
+) -> Result<HashMap<String, SourceConnector>, RuntimeError> {
+    let mut source_connectors: HashMap<String, SourceConnector> = 
HashMap::new();
+    for (key, config) in source_configs {
+        let name = config.name;
+        if !config.enabled {
+            warn!("Source: {name} is disabled ({key})");
+            continue;
+        }
+
+        let plugin_id = PLUGIN_ID.load(Ordering::Relaxed);
+        let path = resolve_plugin_path(&config.path);
+        info!("Initializing source container with name: {name} ({key}), 
plugin: {path}",);
+        if let Some(container) = source_connectors.get_mut(&path) {
+            info!("Source container for plugin: {path} is already loaded.",);
+            init_source(
+                &container.container,
+                &config.config.unwrap_or_default(),
+                plugin_id,
+            );
+            container.plugins.push(SourceConnectorPlugin {
+                id: plugin_id,
+                producer: None,
+                transforms: vec![],
+            });
+        } else {
+            let container: Container<SourceApi> =
+                unsafe { Container::load(&path).expect("Failed to load source 
container") };
+            info!("Source container for plugin: {path} loaded successfully.",);
+            init_source(&container, &config.config.unwrap_or_default(), 
plugin_id);
+            source_connectors.insert(
+                path.to_owned(),
+                SourceConnector {
+                    container,
+                    plugins: vec![SourceConnectorPlugin {
+                        id: plugin_id,
+                        producer: None,
+                        transforms: vec![],
+                    }],
+                },
+            );
+        }
+
+        info!(
+            "Source container with name: {name} ({key}), initialized 
successfully with ID: {plugin_id}."
+        );
+        PLUGIN_ID.fetch_add(1, Ordering::Relaxed);
+
+        let transforms = if let Some(transforms_config) = config.transforms {
+            let transforms = transform::load(transforms_config).expect("Failed 
to load transforms");
+            let types = transforms
+                .iter()
+                .map(|t| t.r#type().into())
+                .collect::<Vec<&'static str>>()
+                .join(", ");
+            info!("Enabled transforms for source: {name} ({key}): {types}",);
+            transforms
+        } else {
+            vec![]
+        };
+
+        let connector = source_connectors
+            .get_mut(&path)
+            .expect("Failed to get source connector");
+        let plugin = connector
+            .plugins
+            .iter_mut()
+            .find(|p| p.id == plugin_id)
+            .expect("Failed to get source plugin");
+
+        for stream in config.streams {
+            let send_interval =
+                
IggyDuration::from_str(&stream.send_interval.unwrap_or("5ms".to_owned()))
+                    .expect("Invalid send interval");
+            let batch_size = stream.batch_size.unwrap_or(1000);
+            let mut producer = iggy_client
+                .producer(&stream.stream, &stream.topic)?
+                .send_interval(send_interval)
+                .batch_size(batch_size)
+                .build();
+
+            producer.init().await?;
+            plugin.producer = Some(SourceConnectorProducer {
+                producer,
+                encoder: stream.schema.encoder(),
+            });
+            plugin.transforms = transforms.clone();
+        }
+    }
+
+    Ok(source_connectors)
+}
+
+pub fn handle(sources: Vec<SourceConnectorWrapper>) {
+    for source in sources {
+        for plugin in source.plugins {
+            let plugin_id = plugin.id;
+            info!("Starting handler for source with ID: {plugin_id}...");
+            let handle = source.callback;
+            tokio::task::spawn_blocking(move || {
+                handle(plugin_id, handle_produced_messages);
+            });
+            info!("Handler for source with ID: {plugin_id} started 
successfully.");
+
+            let (sender, receiver): (Sender<ProducedMessages>, 
Receiver<ProducedMessages>) =
+                flume::unbounded();
+            SOURCE_SENDERS.insert(plugin_id, sender);
+            tokio::spawn(async move {
+                info!("Source #{plugin_id} started");
+                let producer = &plugin.producer.expect("Producer not 
initialized");
+                let encoder = producer.encoder.clone();
+                let producer = &producer.producer;
+                let mut number = 1;
+
+                let topic_metadata = TopicMetadata {
+                    stream: producer.stream().to_string(),
+                    topic: producer.topic().to_string(),
+                };
+
+                while let Ok(received_messages) = receiver.recv_async().await {
+                    let count = received_messages.messages.len();
+                    info!("[Source #{plugin_id} received {count} messages",);
+                    let schema = received_messages.schema;
+                    let mut messages: Vec<DecodedMessage> = 
Vec::with_capacity(count);
+                    for message in received_messages.messages {
+                        let Ok(payload) = 
schema.try_into_payload(message.payload) else {
+                            error!(
+                                "Failed to decode message payload with schema: 
{}",
+                                received_messages.schema
+                            );
+                            continue;
+                        };
+
+                        debug!(
+                            "[Source #{plugin_id}] Message: {number} | Schema: 
{schema} | Payload: {payload}"
+                        );
+                        messages.push(DecodedMessage {
+                            id: message.id,
+                            offset: None,
+                            headers: message.headers,
+                            payload,
+                        });
+                        number += 1;
+                    }
+
+                    let Ok(iggy_messages) =
+                        process_messages(&encoder, &topic_metadata, messages, 
&plugin.transforms)
+                    else {
+                        error!(
+                            "Failed to process {count} messages before sending 
them to stream: {}, topic: {}.",
+                            producer.stream(),
+                            producer.topic()
+                        );
+                        continue;
+                    };
+
+                    if let Err(error) = producer.send(iggy_messages).await {
+                        error!(
+                            "Failed to send {count} messages to stream: {}, 
topic: {}. {error}",
+                            producer.stream(),
+                            producer.topic(),
+                        );
+                        continue;
+                    }
+
+                    info!(
+                        "Sent {count} messages to stream: {}, topic: {}",
+                        producer.stream(),
+                        producer.topic()
+                    );
+                }
+            });
+        }
+    }
+}
+
+fn process_messages(
+    encoder: &Arc<dyn StreamEncoder>,
+    topic_metadata: &TopicMetadata,
+    messages: Vec<DecodedMessage>,
+    transforms: &Vec<Arc<dyn Transform>>,
+) -> Result<Vec<IggyMessage>, Error> {
+    let mut iggy_messages = Vec::with_capacity(messages.len());
+    for message in messages {
+        let mut current_message = Some(message);
+        for transform in transforms.iter() {
+            let Some(message) = current_message else {
+                break;
+            };
+
+            current_message = transform.transform(topic_metadata, message)?;
+        }
+
+        // The transform may return no message based on some conditions
+        let Some(message) = current_message else {
+            continue;
+        };
+
+        let Ok(payload) = encoder.encode(message.payload) else {
+            error!("Failed to encode message payload");
+            continue;
+        };
+
+        let Ok(iggy_message) = build_iggy_message(payload, message.id, 
message.headers) else {
+            error!("Failed to build Iggy message");
+            continue;
+        };
+
+        iggy_messages.push(iggy_message);
+    }
+    Ok(iggy_messages)
+}
+
+fn init_source(container: &Container<SourceApi>, config: &serde_json::Value, 
id: u32) {
+    let config = serde_json::to_string(config).expect("Invalid source 
config.");
+    (container.open)(id, config.as_ptr(), config.len());
+}
+
+extern "C" fn handle_produced_messages(
+    plugin_id: u32,
+    messages_ptr: *const u8,
+    messages_len: usize,
+) {
+    unsafe {
+        if let Some(sender) = SOURCE_SENDERS.get(&plugin_id) {
+            let messages = std::slice::from_raw_parts(messages_ptr, 
messages_len);
+            let Ok(messages) = 
postcard::from_bytes::<ProducedMessages>(messages) else {
+                eprintln!("Failed to deserialize produced messages.");
+                return;
+            };
+            let _ = sender.send(messages);
+        }
+    }
+}
+
+fn build_iggy_message(
+    payload: Vec<u8>,
+    id: Option<u128>,
+    headers: Option<HashMap<HeaderKey, HeaderValue>>,
+) -> Result<IggyMessage, IggyError> {
+    match (id, headers) {
+        (Some(id), Some(h)) => IggyMessage::builder()
+            .payload(payload.into())
+            .id(id)
+            .user_headers(h)
+            .build(),
+        (Some(id), None) => IggyMessage::builder()
+            .payload(payload.into())
+            .id(id)
+            .build(),
+        (None, Some(h)) => IggyMessage::builder()
+            .payload(payload.into())
+            .user_headers(h)
+            .build(),
+        (None, None) => IggyMessage::builder().payload(payload.into()).build(),
+    }
+}
diff --git a/core/connectors/runtime/src/transform.rs 
b/core/connectors/runtime/src/transform.rs
new file mode 100644
index 00000000..2417fbfa
--- /dev/null
+++ b/core/connectors/runtime/src/transform.rs
@@ -0,0 +1,66 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::sync::Arc;
+
+use iggy_connector_sdk::transforms::{
+    Transform, TransformType, add_fields::AddFields, 
delete_fields::DeleteFields,
+};
+use serde::de::DeserializeOwned;
+use tracing::error;
+
+use crate::{InitialTransformConfig, RuntimeError, TransformsConfig};
+
+pub fn load(config: TransformsConfig) -> Result<Vec<Arc<dyn Transform>>, 
RuntimeError> {
+    let mut transforms: Vec<Arc<dyn Transform>> = vec![];
+    let mut types = vec![];
+    for (r#type, transform_config) in config.transforms {
+        let initial_config =
+            
serde_json::from_value::<InitialTransformConfig>(transform_config.clone())
+                .unwrap_or_default();
+        if !initial_config.enabled {
+            continue;
+        }
+
+        let transform = load_transform(r#type, transform_config)?;
+        types.push(r#type);
+        transforms.push(transform);
+    }
+
+    Ok(transforms)
+}
+
+fn load_transform(
+    r#type: TransformType,
+    config: serde_json::Value,
+) -> Result<Arc<dyn Transform>, RuntimeError> {
+    Ok(match r#type {
+        TransformType::AddFields => Arc::new(AddFields::new(as_config(r#type, 
config)?)),
+        TransformType::DeleteFields => 
Arc::new(DeleteFields::new(as_config(r#type, config)?)),
+    })
+}
+
+fn as_config<T: DeserializeOwned>(
+    r#type: TransformType,
+    config: serde_json::Value,
+) -> Result<T, RuntimeError> {
+    serde_json::from_value::<T>(config).map_err(|error| {
+        error!("Failed to deserialize config for transform: {type}. {error}");
+        RuntimeError::InvalidConfig
+    })
+}
diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml
new file mode 100644
index 00000000..0cace762
--- /dev/null
+++ b/core/connectors/sdk/Cargo.toml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_sdk"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[dependencies]
+async-trait = { workspace = true }
+chrono = { workspace = true }
+dashmap = { workspace = true }
+flume = { workspace = true }
+iggy = { workspace = true }
+iggy_binary_protocol = { workspace = true }
+once_cell = { workspace = true }
+postcard = { workspace = true }
+serde = { workspace = true }
+serde_json = { workspace = true }
+simd-json = { workspace = true }
+strum_macros = { workspace = true }
+thiserror = { workspace = true }
+tracing = { workspace = true }
+tracing-subscriber = { workspace = true }
+tokio = { workspace = true }
+uuid = { workspace = true }
diff --git a/core/connectors/sdk/src/decoders/json.rs 
b/core/connectors/sdk/src/decoders/json.rs
new file mode 100644
index 00000000..f2bda418
--- /dev/null
+++ b/core/connectors/sdk/src/decoders/json.rs
@@ -0,0 +1,37 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamDecoder};
+use tracing::error;
+
+pub struct JsonStreamDecoder;
+
+impl StreamDecoder for JsonStreamDecoder {
+    fn schema(&self) -> Schema {
+        Schema::Json
+    }
+
+    fn decode(&self, mut payload: Vec<u8>) -> Result<Payload, Error> {
+        Ok(Payload::Json(
+            simd_json::to_owned_value(&mut payload).map_err(|error| {
+                error!("Failed to decode JSON payload: {error}");
+                Error::CannotDecode(self.schema())
+            })?,
+        ))
+    }
+}
diff --git a/core/connectors/sdk/src/decoders/mod.rs 
b/core/connectors/sdk/src/decoders/mod.rs
new file mode 100644
index 00000000..e1b7e1b7
--- /dev/null
+++ b/core/connectors/sdk/src/decoders/mod.rs
@@ -0,0 +1,21 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod json;
+pub mod raw;
+pub mod text;
diff --git a/core/connectors/sdk/src/decoders/raw.rs 
b/core/connectors/sdk/src/decoders/raw.rs
new file mode 100644
index 00000000..efbddc12
--- /dev/null
+++ b/core/connectors/sdk/src/decoders/raw.rs
@@ -0,0 +1,31 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamDecoder};
+
+pub struct RawStreamDecoder;
+
+impl StreamDecoder for RawStreamDecoder {
+    fn schema(&self) -> Schema {
+        Schema::Raw
+    }
+
+    fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error> {
+        Ok(Payload::Raw(payload))
+    }
+}
diff --git a/core/connectors/sdk/src/decoders/text.rs 
b/core/connectors/sdk/src/decoders/text.rs
new file mode 100644
index 00000000..6d7722cb
--- /dev/null
+++ b/core/connectors/sdk/src/decoders/text.rs
@@ -0,0 +1,37 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamDecoder};
+use tracing::error;
+
+pub struct TextStreamDecoder;
+
+impl StreamDecoder for TextStreamDecoder {
+    fn schema(&self) -> Schema {
+        Schema::Text
+    }
+
+    fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error> {
+        Ok(Payload::Text(String::from_utf8(payload).map_err(
+            |error| {
+                error!("Failed to decode text payload: {error}");
+                Error::CannotDecode(self.schema())
+            },
+        )?))
+    }
+}
diff --git a/core/connectors/sdk/src/encoders/json.rs 
b/core/connectors/sdk/src/encoders/json.rs
new file mode 100644
index 00000000..03f4e4f0
--- /dev/null
+++ b/core/connectors/sdk/src/encoders/json.rs
@@ -0,0 +1,39 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamEncoder};
+
+pub struct JsonStreamEncoder;
+
+impl StreamEncoder for JsonStreamEncoder {
+    fn schema(&self) -> Schema {
+        Schema::Json
+    }
+
+    fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error> {
+        match payload {
+            Payload::Text(value) => {
+                Ok(simd_json::to_vec(&value).map_err(|_| 
Error::InvalidJsonPayload)?)
+            }
+            Payload::Json(value) => {
+                Ok(simd_json::to_vec(&value).map_err(|_| 
Error::InvalidJsonPayload)?)
+            }
+            _ => Err(Error::InvalidPayloadType),
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/encoders/mod.rs 
b/core/connectors/sdk/src/encoders/mod.rs
new file mode 100644
index 00000000..e1b7e1b7
--- /dev/null
+++ b/core/connectors/sdk/src/encoders/mod.rs
@@ -0,0 +1,21 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod json;
+pub mod raw;
+pub mod text;
diff --git a/core/connectors/sdk/src/encoders/raw.rs 
b/core/connectors/sdk/src/encoders/raw.rs
new file mode 100644
index 00000000..68b3b9bb
--- /dev/null
+++ b/core/connectors/sdk/src/encoders/raw.rs
@@ -0,0 +1,34 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamEncoder};
+
+pub struct RawStreamEncoder;
+
+impl StreamEncoder for RawStreamEncoder {
+    fn schema(&self) -> Schema {
+        Schema::Raw
+    }
+
+    fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error> {
+        match payload {
+            Payload::Raw(value) => Ok(value),
+            _ => Err(Error::InvalidPayloadType),
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/encoders/text.rs 
b/core/connectors/sdk/src/encoders/text.rs
new file mode 100644
index 00000000..6fb2d4e9
--- /dev/null
+++ b/core/connectors/sdk/src/encoders/text.rs
@@ -0,0 +1,37 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Payload, Schema, StreamEncoder};
+
+pub struct TextStreamEncoder;
+
+impl StreamEncoder for TextStreamEncoder {
+    fn schema(&self) -> Schema {
+        Schema::Text
+    }
+
+    fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error> {
+        match payload {
+            Payload::Text(value) => Ok(value.into_bytes()),
+            Payload::Json(value) => {
+                Ok(simd_json::to_vec(&value).map_err(|_| 
Error::InvalidJsonPayload)?)
+            }
+            _ => Err(Error::InvalidPayloadType),
+        }
+    }
+}
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
new file mode 100644
index 00000000..749c666e
--- /dev/null
+++ b/core/connectors/sdk/src/lib.rs
@@ -0,0 +1,239 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use decoders::{json::JsonStreamDecoder, raw::RawStreamDecoder, 
text::TextStreamDecoder};
+use encoders::{json::JsonStreamEncoder, raw::RawStreamEncoder, 
text::TextStreamEncoder};
+use iggy::prelude::{HeaderKey, HeaderValue};
+use once_cell::sync::OnceCell;
+use serde::{Deserialize, Serialize};
+use std::{collections::HashMap, sync::Arc};
+use strum_macros::{Display, IntoStaticStr};
+use thiserror::Error;
+use tokio::runtime::Runtime;
+
+pub mod decoders;
+pub mod encoders;
+pub mod sink;
+pub mod source;
+pub mod transforms;
+
+static RUNTIME: OnceCell<Runtime> = OnceCell::new();
+
+pub fn get_runtime() -> &'static Runtime {
+    RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio 
runtime"))
+}
+
+#[async_trait]
+pub trait Source: Send + Sync {
+    async fn open(&mut self) -> Result<(), Error>;
+    async fn poll(&self) -> Result<ProducedMessages, Error>;
+    async fn close(&mut self);
+}
+
+#[async_trait]
+pub trait Sink: Send + Sync {
+    async fn open(&mut self) -> Result<(), Error>;
+    async fn consume(
+        &self,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error>;
+    async fn close(&mut self);
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum Payload {
+    Json(simd_json::OwnedValue),
+    Raw(Vec<u8>),
+    Text(String),
+}
+
+impl Payload {
+    pub fn try_into_vec(self) -> Result<Vec<u8>, Error> {
+        match self {
+            Payload::Json(value) => {
+                Ok(simd_json::to_vec(&value).map_err(|_| 
Error::InvalidJsonPayload)?)
+            }
+            Payload::Raw(value) => Ok(value),
+            Payload::Text(text) => Ok(text.into_bytes()),
+        }
+    }
+}
+
+impl std::fmt::Display for Payload {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Payload::Json(value) => write!(
+                f,
+                "Json({})",
+                simd_json::to_string_pretty(value).unwrap_or_default()
+            ),
+            Payload::Raw(value) => write!(f, "Raw({:#?})", value),
+            Payload::Text(text) => write!(f, "Text({})", text),
+        }
+    }
+}
+
+#[repr(C)]
+#[derive(
+    Debug, Copy, Clone, Eq, Hash, PartialEq, Serialize, Deserialize, Display, 
IntoStaticStr,
+)]
+#[serde(rename_all = "snake_case")]
+pub enum Schema {
+    #[strum(to_string = "json")]
+    Json,
+    #[strum(to_string = "raw")]
+    Raw,
+    #[strum(to_string = "text")]
+    Text,
+}
+
+impl Schema {
+    pub fn try_into_payload(self, mut value: Vec<u8>) -> Result<Payload, 
Error> {
+        match self {
+            Schema::Json => Ok(Payload::Json(
+                simd_json::to_owned_value(&mut value).map_err(|_| 
Error::InvalidJsonPayload)?,
+            )),
+            Schema::Raw => Ok(Payload::Raw(value)),
+            Schema::Text => Ok(Payload::Text(
+                String::from_utf8(value).map_err(|_| 
Error::InvalidTextPayload)?,
+            )),
+        }
+    }
+
+    pub fn decoder(self) -> Arc<dyn StreamDecoder> {
+        match self {
+            Schema::Json => Arc::new(JsonStreamDecoder),
+            Schema::Raw => Arc::new(RawStreamDecoder),
+            Schema::Text => Arc::new(TextStreamDecoder),
+        }
+    }
+
+    pub fn encoder(self) -> Arc<dyn StreamEncoder> {
+        match self {
+            Schema::Json => Arc::new(JsonStreamEncoder),
+            Schema::Raw => Arc::new(RawStreamEncoder),
+            Schema::Text => Arc::new(TextStreamEncoder),
+        }
+    }
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct TopicMetadata {
+    pub stream: String,
+    pub topic: String,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct MessagesMetadata {
+    pub partition_id: u32,
+    pub current_offset: u64,
+    pub schema: Schema,
+}
+
+#[repr(C)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ReceivedMessage {
+    pub id: u128,
+    pub offset: u64,
+    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub payload: Vec<u8>,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct ProducedMessages {
+    pub schema: Schema,
+    pub messages: Vec<ProducedMessage>,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct ProducedMessage {
+    pub id: Option<u128>,
+    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub payload: Vec<u8>,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct DecodedMessage {
+    pub id: Option<u128>,
+    pub offset: Option<u64>,
+    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub payload: Payload,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct RawMessages {
+    pub schema: Schema,
+    pub messages: Vec<RawMessage>,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct RawMessage {
+    pub offset: u64,
+    pub headers: Vec<u8>,
+    pub payload: Vec<u8>,
+}
+
+#[repr(C)]
+#[derive(Debug, Serialize, Deserialize)]
+pub struct ConsumedMessage {
+    pub offset: u64,
+    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub payload: Payload,
+}
+
+pub trait StreamDecoder: Send + Sync {
+    fn schema(&self) -> Schema;
+    fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error>;
+}
+
+pub trait StreamEncoder: Send + Sync {
+    fn schema(&self) -> Schema;
+    fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error>;
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, Error)]
+pub enum Error {
+    #[error("Invalid config")]
+    InvalidConfig,
+    #[error("Invalid record")]
+    InvalidRecord,
+    #[error("Invalid transformer")]
+    InvalidTransformer,
+    #[error("HTTP request failed: {0}")]
+    HttpRequestFailed(String),
+    #[error("Init error: {0}")]
+    InitError(String),
+    #[error("Invalid payload type")]
+    InvalidPayloadType,
+    #[error("Invalid JSON payload.")]
+    InvalidJsonPayload,
+    #[error("Invalid text payload.")]
+    InvalidTextPayload,
+    #[error("Cannot decode schema {0}")]
+    CannotDecode(Schema),
+}
diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
new file mode 100644
index 00000000..bd836ac6
--- /dev/null
+++ b/core/connectors/sdk/src/sink.rs
@@ -0,0 +1,240 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::de::DeserializeOwned;
+use tokio::sync::watch;
+use tracing::{error, info};
+use tracing_subscriber::fmt;
+
+use crate::{ConsumedMessage, MessagesMetadata, RawMessages, Sink, 
TopicMetadata, get_runtime};
+
+pub type ConsumeCallback = extern "C" fn(
+    plugin_id: u32,
+    topic_meta_ptr: *const u8,
+    topic_meta_len: usize,
+    messages_meta_ptr: *const u8,
+    messages_meta_len: usize,
+    messages_ptr: *const u8,
+    messages_len: usize,
+) -> i32;
+
+#[derive(Debug)]
+pub struct SinkContainer<T: Sink + std::fmt::Debug> {
+    id: u32,
+    sink: Option<T>,
+    shutdown: Option<watch::Sender<()>>,
+}
+
+impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
+    pub const fn new(id: u32) -> Self {
+        Self {
+            id,
+            sink: None,
+            shutdown: None,
+        }
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn open<F, C>(
+        &mut self,
+        id: u32,
+        config_ptr: *const u8,
+        config_len: usize,
+        factory: F,
+    ) -> i32
+    where
+        F: FnOnce(u32, C) -> T,
+        C: DeserializeOwned,
+    {
+        unsafe {
+            _ = fmt::try_init();
+            let slice = std::slice::from_raw_parts(config_ptr, config_len);
+            let Ok(config_str) = std::str::from_utf8(slice) else {
+                return -1;
+            };
+
+            let Ok(config) = serde_json::from_str(config_str) else {
+                return -1;
+            };
+
+            let mut sink = factory(id, config);
+            let runtime = get_runtime();
+            let result = runtime.block_on(sink.open());
+            self.id = id;
+            self.sink = Some(sink);
+            if result.is_ok() { 0 } else { 1 }
+        }
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn close(&mut self) -> i32 {
+        let Some(mut sink) = self.sink.take() else {
+            error!(
+                "Sink with ID: {} is not initialized - cannot close.",
+                self.id
+            );
+            return -1;
+        };
+
+        info!("Closing sink with ID: {}...", self.id);
+        if let Some(tx) = self.shutdown.take() {
+            let _ = tx.send(());
+        }
+
+        let runtime = get_runtime();
+        runtime.block_on(sink.close());
+        info!("Closed sink with ID: {}", self.id);
+        0
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn consume(
+        &self,
+        topic_meta_ptr: *const u8,
+        topic_meta_len: usize,
+        messages_meta_ptr: *const u8,
+        messages_meta_len: usize,
+        messages_ptr: *const u8,
+        messages_len: usize,
+    ) -> i32 {
+        unsafe {
+            let Some(sink) = self.sink.as_ref() else {
+                error!(
+                    "Sink with ID: {} is not initialized - cannot consume.",
+                    self.id
+                );
+                return -1;
+            };
+
+            let topic_meta_slice =
+                std::slice::from_raw_parts(topic_meta_ptr, 
topic_meta_len).to_vec();
+            let messages_meta_slice =
+                std::slice::from_raw_parts(messages_meta_ptr, 
messages_meta_len).to_vec();
+            let messages_slice = std::slice::from_raw_parts(messages_ptr, 
messages_len).to_vec();
+
+            let Ok(topic_metadata) = 
postcard::from_bytes::<TopicMetadata>(&topic_meta_slice)
+            else {
+                error!("Failed to decode topic metadata");
+                return -1;
+            };
+
+            let Ok(messages_metadata) =
+                postcard::from_bytes::<MessagesMetadata>(&messages_meta_slice)
+            else {
+                error!("Failed to decode messages metadata");
+                return -1;
+            };
+
+            let Ok(raw_messages) = 
postcard::from_bytes::<RawMessages>(&messages_slice) else {
+                error!("Failed to decode raw messages");
+                return -1;
+            };
+
+            let mut messages = Vec::with_capacity(raw_messages.messages.len());
+            for message in raw_messages.messages {
+                let headers = if message.headers.is_empty() {
+                    None
+                } else {
+                    let Ok(headers) = postcard::from_bytes(&message.headers) 
else {
+                        error!("Failed to decode message headers");
+                        continue;
+                    };
+                    Some(headers)
+                };
+
+                let Ok(payload) = 
messages_metadata.schema.try_into_payload(message.payload) else {
+                    error!("Failed to decode message payload");
+                    continue;
+                };
+
+                messages.push(ConsumedMessage {
+                    offset: message.offset,
+                    headers,
+                    payload,
+                })
+            }
+
+            let runtime = get_runtime();
+            let result =
+                runtime.block_on(sink.consume(&topic_metadata, 
messages_metadata, messages));
+            if result.is_ok() { 0 } else { 1 }
+        }
+    }
+}
+
+#[macro_export]
+macro_rules! sink_connector {
+    ($type:ty) => {
+        const _: fn() = || {
+            fn assert_trait<T: $crate::Sink>() {}
+            assert_trait::<$type>();
+        };
+
+        use dashmap::DashMap;
+        use once_cell::sync::Lazy;
+        use $crate::sink::SinkContainer;
+
+        static INSTANCES: Lazy<DashMap<u32, SinkContainer<$type>>> = 
Lazy::new(DashMap::new);
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn open(id: u32, config_ptr: *const u8, config_len: 
usize) -> i32 {
+            let mut container = SinkContainer::new(id);
+            let result = container.open(id, config_ptr, config_len, 
<$type>::new);
+            INSTANCES.insert(id, container);
+            result
+        }
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn consume(
+            id: u32,
+            topic_meta_ptr: *const u8,
+            topic_meta_len: usize,
+            messages_meta_ptr: *const u8,
+            messages_meta_len: usize,
+            messages_ptr: *const u8,
+            messages_len: usize,
+        ) -> i32 {
+            let Some(instance) = INSTANCES.get(&id) else {
+                eprintln!(
+                    "Sink connector with ID: {id} was not found and consume 
cannot be invoked."
+                );
+                return -1;
+            };
+            instance.consume(
+                topic_meta_ptr,
+                topic_meta_len,
+                messages_meta_ptr,
+                messages_meta_len,
+                messages_ptr,
+                messages_len,
+            )
+        }
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn close(id: u32) -> i32 {
+            let Some(mut instance) = INSTANCES.remove(&id) else {
+                eprintln!("Sink connector with ID: {id} was not found and 
cannot be closed.");
+                return -1;
+            };
+            instance.1.close()
+        }
+    };
+}
diff --git a/core/connectors/sdk/src/source.rs 
b/core/connectors/sdk/src/source.rs
new file mode 100644
index 00000000..ef47b7ee
--- /dev/null
+++ b/core/connectors/sdk/src/source.rs
@@ -0,0 +1,218 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::{Error, Source, get_runtime};
+use serde::de::DeserializeOwned;
+use std::sync::Arc;
+use tokio::{sync::watch, task::JoinHandle};
+use tracing::{error, info};
+use tracing_subscriber::fmt;
+
+#[repr(C)]
+pub struct RawMessage {
+    pub offset: u64,
+    pub headers_ptr: *const u8,
+    pub headers_len: usize,
+    pub payload_ptr: *const u8,
+    pub payload_len: usize,
+}
+
+pub type HandleCallback = extern "C" fn(plugin_id: u32, callback: 
SendCallback) -> i32;
+
+pub type SendCallback = extern "C" fn(plugin_id: u32, messages_ptr: *const u8, 
messages_len: usize);
+
+#[derive(Debug)]
+pub struct SourceContainer<T: Source + std::fmt::Debug> {
+    id: u32,
+    source: Option<Arc<T>>,
+    shutdown: Option<watch::Sender<()>>,
+    task: Option<JoinHandle<()>>,
+}
+
+impl<T: Source + std::fmt::Debug + 'static> SourceContainer<T> {
+    pub const fn new(id: u32) -> Self {
+        Self {
+            id,
+            source: None,
+            shutdown: None,
+            task: None,
+        }
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn open<F, C>(
+        &mut self,
+        id: u32,
+        config_ptr: *const u8,
+        config_len: usize,
+        factory: F,
+    ) -> i32
+    where
+        F: FnOnce(u32, C) -> T,
+        C: DeserializeOwned,
+    {
+        unsafe {
+            _ = fmt::try_init();
+            let slice = std::slice::from_raw_parts(config_ptr, config_len);
+            let Ok(config_str) = std::str::from_utf8(slice) else {
+                return -1;
+            };
+
+            let Ok(config) = serde_json::from_str(config_str) else {
+                return -1;
+            };
+
+            let mut source = factory(id, config);
+            let runtime = get_runtime();
+            let result = runtime.block_on(source.open());
+            self.id = id;
+            self.source = Some(Arc::new(source));
+            if result.is_ok() { 0 } else { 1 }
+        }
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn close(&mut self) -> i32 {
+        let Some(source) = self.source.take() else {
+            error!(
+                "Source with ID: {} is not initialized - cannot close.",
+                self.id
+            );
+            return -1;
+        };
+
+        info!("Closing source with ID: {}...", self.id);
+        if let Some(sender) = self.shutdown.take() {
+            let _ = sender.send(());
+        }
+
+        let runtime = get_runtime();
+        if let Some(handle) = self.task.take() {
+            let _ = runtime.block_on(handle);
+        }
+
+        let Ok(mut source) = Arc::try_unwrap(source) else {
+            error!("Source with ID: {} was already closed.", self.id);
+            return -1;
+        };
+
+        runtime.block_on(source.close());
+        info!("Closed source with ID: {}", self.id);
+        0
+    }
+
+    /// # Safety
+    /// This is safe
+    pub unsafe fn handle(&mut self, callback: SendCallback) -> i32 {
+        let Some(source) = self.source.as_ref() else {
+            error!(
+                "Source with ID: {} is not initialized - cannot handle.",
+                self.id
+            );
+            return -1;
+        };
+
+        let runtime = get_runtime();
+        let (shutdown_tx, shutdown_rx) = watch::channel(());
+        let plugin_id = self.id;
+        let source = Arc::clone(source);
+        let handle = runtime.spawn(async move {
+            let _ = handle_messages(plugin_id, source, callback, 
shutdown_rx).await;
+        });
+
+        self.shutdown = Some(shutdown_tx);
+        self.task = Some(handle);
+        0
+    }
+}
+
+async fn handle_messages<T: Source>(
+    plugin_id: u32,
+    source: Arc<T>,
+    callback: SendCallback,
+    mut shutdown: watch::Receiver<()>,
+) -> Result<(), Error> {
+    loop {
+        tokio::select! {
+            _ = shutdown.changed() => {
+                info!("Shutting down source container with ID: {plugin_id}");
+                break;
+            }
+            messages = source.poll() => {
+                let Ok(messages) = messages else {
+                    error!("Failed to poll messages for source container with 
ID: {plugin_id}");
+                    continue;
+                };
+
+                let Ok(messages) = postcard::to_allocvec(&messages) else {
+                    error!("Failed to serialize messages for source container 
with ID: {plugin_id}");
+                    continue;
+                };
+
+                callback(plugin_id, messages.as_ptr(), messages.len());
+            }
+        }
+    }
+
+    Ok(())
+}
+
+#[macro_export]
+macro_rules! source_connector {
+    ($type:ty) => {
+        const _: fn() = || {
+            fn assert_trait<T: $crate::Source>() {}
+            assert_trait::<$type>();
+        };
+
+        use dashmap::DashMap;
+        use once_cell::sync::Lazy;
+        use $crate::source::SendCallback;
+        use $crate::source::SourceContainer;
+
+        static INSTANCES: Lazy<DashMap<u32, SourceContainer<$type>>> = 
Lazy::new(DashMap::new);
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn open(id: u32, config_ptr: *const u8, config_len: 
usize) -> i32 {
+            let mut container = SourceContainer::new(id);
+            let result = container.open(id, config_ptr, config_len, 
<$type>::new);
+            INSTANCES.insert(id, container);
+            result
+        }
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn handle(id: u32, callback: SendCallback) -> i32 {
+            let Some(mut instance) = INSTANCES.get_mut(&id) else {
+                eprintln!("Source connector with ID: {id} was not found and 
cannot be handled.");
+                return -1;
+            };
+            instance.handle(callback)
+        }
+
+        #[unsafe(no_mangle)]
+        unsafe extern "C" fn close(id: u32) -> i32 {
+            let Some(mut instance) = INSTANCES.remove(&id) else {
+                eprintln!("Source connector with ID: {id} was not found and 
cannot be closed.");
+                return -1;
+            };
+            instance.1.close()
+        }
+    };
+}
diff --git a/core/connectors/sdk/src/transforms/add_fields.rs 
b/core/connectors/sdk/src/transforms/add_fields.rs
new file mode 100644
index 00000000..569a16b8
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/add_fields.rs
@@ -0,0 +1,128 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Serialize};
+use simd_json::OwnedValue;
+use strum_macros::{Display, IntoStaticStr};
+
+use crate::{DecodedMessage, Error, Payload, TopicMetadata};
+
+use super::{Transform, TransformType};
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct AddFieldsConfig {
+    fields: Vec<Field>,
+}
+
+pub struct AddFields {
+    fields: Vec<Field>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct Field {
+    key: String,
+    value: FieldValue,
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+enum FieldValue {
+    Static(simd_json::OwnedValue),
+    Computed(ComputedValue),
+}
+
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Display, 
IntoStaticStr)]
+#[serde(rename_all = "snake_case")]
+enum ComputedValue {
+    #[strum(to_string = "date_time")]
+    DateTime,
+    #[strum(to_string = "timestamp_nanos")]
+    TimestampNanos,
+    #[strum(to_string = "timestamp_micros")]
+    TimestampMicros,
+    #[strum(to_string = "timestamp_millis")]
+    TimestampMillis,
+    #[strum(to_string = "timestamp_seconds")]
+    TimestampSeconds,
+    #[strum(to_string = "uuid_v4")]
+    UuidV4,
+    #[strum(to_string = "uuid_v7")]
+    UuidV7,
+}
+
+impl AddFields {
+    pub fn new(config: AddFieldsConfig) -> Self {
+        Self {
+            fields: config.fields,
+        }
+    }
+}
+
+impl Transform for AddFields {
+    fn r#type(&self) -> TransformType {
+        TransformType::AddFields
+    }
+
+    fn transform(
+        &self,
+        _metadata: &TopicMetadata,
+        mut message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error> {
+        if self.fields.is_empty() {
+            return Ok(Some(message));
+        }
+
+        let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload 
else {
+            return Ok(Some(message));
+        };
+
+        for field in &self.fields {
+            match &field.value {
+                FieldValue::Static(value) => map.insert(field.key.clone(), 
value.clone()),
+                FieldValue::Computed(value) => match value {
+                    ComputedValue::DateTime => {
+                        map.insert(field.key.clone(), 
chrono::Utc::now().to_rfc3339().into())
+                    }
+                    ComputedValue::TimestampMillis => map.insert(
+                        field.key.clone(),
+                        chrono::Utc::now().timestamp_millis().into(),
+                    ),
+                    ComputedValue::TimestampMicros => map.insert(
+                        field.key.clone(),
+                        chrono::Utc::now().timestamp_micros().into(),
+                    ),
+                    ComputedValue::TimestampNanos => map.insert(
+                        field.key.clone(),
+                        chrono::Utc::now().timestamp_nanos_opt().into(),
+                    ),
+                    ComputedValue::TimestampSeconds => {
+                        map.insert(field.key.clone(), 
chrono::Utc::now().timestamp().into())
+                    }
+                    ComputedValue::UuidV4 => {
+                        map.insert(field.key.clone(), 
uuid::Uuid::new_v4().to_string().into())
+                    }
+                    ComputedValue::UuidV7 => {
+                        map.insert(field.key.clone(), 
uuid::Uuid::now_v7().to_string().into())
+                    }
+                },
+            };
+        }
+
+        Ok(Some(message))
+    }
+}
diff --git a/core/connectors/sdk/src/transforms/delete_fields.rs 
b/core/connectors/sdk/src/transforms/delete_fields.rs
new file mode 100644
index 00000000..d7dd755a
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/delete_fields.rs
@@ -0,0 +1,61 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::collections::HashSet;
+
+use serde::{Deserialize, Serialize};
+use simd_json::OwnedValue;
+
+use crate::{DecodedMessage, Error, Payload, TopicMetadata};
+
+use super::{Transform, TransformType};
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct DeleteFieldsConfig {
+    fields: Vec<String>,
+}
+
+pub struct DeleteFields {
+    fields: HashSet<String>,
+}
+
+impl DeleteFields {
+    pub fn new(config: DeleteFieldsConfig) -> Self {
+        Self {
+            fields: config.fields.into_iter().collect(),
+        }
+    }
+}
+
+impl Transform for DeleteFields {
+    fn r#type(&self) -> TransformType {
+        TransformType::DeleteFields
+    }
+
+    fn transform(
+        &self,
+        _metadata: &TopicMetadata,
+        mut message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error> {
+        if let Payload::Json(OwnedValue::Object(ref mut map)) = 
message.payload {
+            map.retain(|key, _| !self.fields.contains(key));
+        }
+
+        Ok(Some(message))
+    }
+}
diff --git a/core/connectors/sdk/src/transforms/mod.rs 
b/core/connectors/sdk/src/transforms/mod.rs
new file mode 100644
index 00000000..35cf3382
--- /dev/null
+++ b/core/connectors/sdk/src/transforms/mod.rs
@@ -0,0 +1,45 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use serde::{Deserialize, Serialize};
+use strum_macros::{Display, IntoStaticStr};
+
+use crate::{DecodedMessage, Error, TopicMetadata};
+
+pub mod add_fields;
+pub mod delete_fields;
+
+pub trait Transform: Send + Sync {
+    fn r#type(&self) -> TransformType;
+    fn transform(
+        &self,
+        metadata: &TopicMetadata,
+        message: DecodedMessage,
+    ) -> Result<Option<DecodedMessage>, Error>;
+}
+
+#[derive(
+    Debug, Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, Display, 
IntoStaticStr,
+)]
+#[serde(rename_all = "snake_case")]
+pub enum TransformType {
+    #[strum(to_string = "add_fields")]
+    AddFields,
+    #[strum(to_string = "delete_fields")]
+    DeleteFields,
+}
diff --git a/core/connectors/sinks/quickwit_sink/Cargo.toml 
b/core/connectors/sinks/quickwit_sink/Cargo.toml
new file mode 100644
index 00000000..5848d24f
--- /dev/null
+++ b/core/connectors/sinks/quickwit_sink/Cargo.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_quickwit_sink"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[lib]
+crate-type = ["cdylib"]
+
+[dependencies]
+iggy_connector_sdk = { workspace = true }
+async-trait = { workspace = true }
+dashmap = { workspace = true }
+once_cell = { workspace = true }
+serde = { workspace = true }
+postcard = { workspace = true }
+reqwest = { workspace = true }
+serde_yml = { workspace = true }
+simd-json = { workspace = true }
+toml = { workspace = true }
+tracing = { workspace = true }
diff --git a/core/connectors/sinks/quickwit_sink/src/lib.rs 
b/core/connectors/sinks/quickwit_sink/src/lib.rs
new file mode 100644
index 00000000..cf0962cd
--- /dev/null
+++ b/core/connectors/sinks/quickwit_sink/src/lib.rs
@@ -0,0 +1,209 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, 
sink_connector,
+};
+use serde::{Deserialize, Serialize};
+use tracing::{error, info, warn};
+
+sink_connector!(QuickwitSink);
+
+#[derive(Debug)]
+pub struct QuickwitSink {
+    id: u32,
+    config: QuickwitSinkConfig,
+    client: reqwest::Client,
+    index_id: String,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct QuickwitSinkConfig {
+    url: String,
+    index: String,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct IndexConfig {
+    index_id: String,
+}
+
+impl QuickwitSink {
+    pub fn new(id: u32, config: QuickwitSinkConfig) -> Self {
+        let index_config =
+            serde_yml::from_str::<IndexConfig>(&config.index).expect("Invalid 
index config.");
+        QuickwitSink {
+            id,
+            config,
+            index_id: index_config.index_id,
+            client: reqwest::Client::new(),
+        }
+    }
+
+    async fn has_index(&self) -> Result<bool, Error> {
+        let url = format!("{}/api/v1/indexes/{}", self.config.url, 
self.index_id);
+        let response = self.client.get(&url).send().await.map_err(|error| {
+            error!(
+                "Failed to send HTTP request to check if index with ID: {} 
exists. {error}",
+                self.index_id
+            );
+            Error::HttpRequestFailed(error.to_string())
+        })?;
+        let status = response.status();
+        if status.is_success() {
+            Ok(true)
+        } else if status == reqwest::StatusCode::NOT_FOUND {
+            Ok(false)
+        } else {
+            Err(Error::HttpRequestFailed(format!(
+                "Unexpected status code: {status}",
+            )))
+        }
+    }
+
+    async fn create_index(&self) -> Result<(), Error> {
+        info!("Creating index: {}", self.index_id);
+        let url = format!("{}/api/v1/indexes", self.config.url);
+        let response = self
+            .client
+            .post(&url)
+            .header("content-type", "application/yaml")
+            .body(self.config.index.to_owned())
+            .send()
+            .await
+            .map_err(|error| {
+                error!(
+                    "Failed to send HTTP request to create index: {}. {error}",
+                    self.index_id
+                );
+                Error::HttpRequestFailed(error.to_string())
+            })?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let reason = response.text().await.unwrap_or_default();
+            error!(
+                "Received an invalid HTTP response when creating index: {}. 
Status code: {status}, reason: {reason}",
+                self.index_id
+            );
+            return Err(Error::InitError(format!(
+                "Failed to create index: {}. {reason}",
+                self.index_id
+            )));
+        }
+
+        info!("Created index: {}", self.index_id);
+        Ok(())
+    }
+
+    pub async fn ingest(&self, messages: Vec<simd_json::OwnedValue>) -> 
Result<(), Error> {
+        let url = format!(
+            "{}/api/v1/{}/ingest?commit=auto",
+            self.config.url, self.index_id
+        );
+        info!("Ingesting messages for index: {}...", self.index_id);
+        let messages_count = messages.len();
+        let messages = messages
+            .into_iter()
+            .filter_map(|record| simd_json::to_string(&record).ok())
+            .collect::<Vec<_>>()
+            .join("\n");
+
+        let response = self
+            .client
+            .post(&url)
+            .body(messages)
+            .send()
+            .await
+            .map_err(|error| {
+                error!(
+                    "Failed to send HTTP request to ingest messages for index: 
{}. {error}",
+                    self.index_id
+                );
+                Error::HttpRequestFailed(error.to_string())
+            })?;
+
+        if !response.status().is_success() {
+            let status = response.status();
+            let text = response.text().await.unwrap_or_default();
+            error!(
+                "Received an invalid HTTP response when ingesting messages for 
index: {}. Status code: {status}, reason: {text}",
+                self.index_id
+            );
+            return Err(Error::HttpRequestFailed(format!(
+                "Status code: {status}, reason: {text}"
+            )));
+        }
+
+        info!(
+            "Ingested {messages_count} messages for index: {}",
+            self.index_id
+        );
+        Ok(())
+    }
+}
+
+#[async_trait]
+impl Sink for QuickwitSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Initialized QuickwitSink with ID: {} for URL: {}",
+            self.id, self.config.url
+        );
+        if !self.has_index().await? {
+            self.create_index().await?;
+        }
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        _topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        info!(
+            "Quickwit sink with ID: {} received: {} messages, format: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.schema
+        );
+
+        let mut json_payloads = Vec::with_capacity(messages.len());
+        for message in messages {
+            match message.payload {
+                Payload::Json(value) => json_payloads.push(value),
+                _ => {
+                    warn!("Unsupported payload format: {}", 
messages_metadata.schema);
+                }
+            }
+        }
+
+        if json_payloads.is_empty() {
+            return Ok(());
+        }
+
+        self.ingest(json_payloads).await?;
+        Ok(())
+    }
+
+    async fn close(&mut self) {
+        info!("Quickwit sink with ID: {} is shutting down", self.id);
+    }
+}
diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml 
b/core/connectors/sinks/stdout_sink/Cargo.toml
new file mode 100644
index 00000000..8e834833
--- /dev/null
+++ b/core/connectors/sinks/stdout_sink/Cargo.toml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_stdout_sink"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[lib]
+crate-type = ["cdylib"]
+
+[dependencies]
+iggy_connector_sdk = { workspace = true }
+async-trait = { workspace = true }
+dashmap = { workspace = true }
+once_cell = { workspace = true }
+serde = { workspace = true }
+postcard = { workspace = true }
+toml = { workspace = true }
+tracing = { workspace = true }
diff --git a/core/connectors/sinks/stdout_sink/src/lib.rs 
b/core/connectors/sinks/stdout_sink/src/lib.rs
new file mode 100644
index 00000000..afa9df59
--- /dev/null
+++ b/core/connectors/sinks/stdout_sink/src/lib.rs
@@ -0,0 +1,88 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata, 
sink_connector,
+};
+use serde::{Deserialize, Serialize};
+use tracing::info;
+
+sink_connector!(StdoutSink);
+
+#[derive(Debug)]
+pub struct StdoutSink {
+    id: u32,
+    print_payload: bool,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StdoutSinkConfig {
+    print_payload: Option<bool>,
+}
+
+impl StdoutSink {
+    pub fn new(id: u32, config: StdoutSinkConfig) -> Self {
+        StdoutSink {
+            id,
+            print_payload: config.print_payload.unwrap_or(false),
+        }
+    }
+}
+
+#[async_trait]
+impl Sink for StdoutSink {
+    async fn open(&mut self) -> Result<(), Error> {
+        info!(
+            "Initialized stdout sink with ID: {}. Print payload: {}",
+            self.id, self.print_payload
+        );
+        Ok(())
+    }
+
+    async fn consume(
+        &self,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        info!(
+            "Stdout sink with ID: {} received: {} messages, schema: {}, 
stream: {}, topic: {}, partition: {}, offset: {}",
+            self.id,
+            messages.len(),
+            messages_metadata.schema,
+            topic_metadata.stream,
+            topic_metadata.stream,
+            messages_metadata.partition_id,
+            messages_metadata.current_offset
+        );
+        if self.print_payload {
+            for message in messages {
+                info!(
+                    "Message offset: {}, payload: {:#?}",
+                    message.offset, message.payload
+                );
+            }
+        }
+        Ok(())
+    }
+
+    async fn close(&mut self) {
+        info!("Stdout sink with ID: {} is shutting down", self.id);
+    }
+}
diff --git a/core/connectors/sources/test_source/Cargo.toml 
b/core/connectors/sources/test_source/Cargo.toml
new file mode 100644
index 00000000..f0eb76c8
--- /dev/null
+++ b/core/connectors/sources/test_source/Cargo.toml
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iggy_connector_test_source"
+version = "0.1.0"
+description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../README.md"
+
+[lib]
+crate-type = ["cdylib"]
+
+[dependencies]
+iggy_connector_sdk = { workspace = true }
+async-trait = { workspace = true }
+dashmap = { workspace = true }
+once_cell = { workspace = true }
+flume = { workspace = true }
+humantime = { workspace = true }
+serde = { workspace = true }
+rand = { workspace = true }
+postcard = { workspace = true }
+simd-json = { workspace = true }
+toml = { workspace = true }
+tracing = { workspace = true }
+tokio = { workspace = true }
diff --git a/core/connectors/sources/test_source/src/lib.rs 
b/core/connectors/sources/test_source/src/lib.rs
new file mode 100644
index 00000000..9d8c4cf1
--- /dev/null
+++ b/core/connectors/sources/test_source/src/lib.rs
@@ -0,0 +1,157 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use std::{str::FromStr, time::Duration};
+
+use async_trait::async_trait;
+use iggy_connector_sdk::{ProducedMessage, ProducedMessages, Schema, Source, 
source_connector};
+use rand::{
+    Rng,
+    distr::{Alphanumeric, Uniform},
+};
+use serde::{Deserialize, Serialize};
+use tokio::{sync::Mutex, time::sleep};
+use tracing::info;
+
+source_connector!(TestSource);
+
+#[derive(Debug)]
+pub struct TestSource {
+    id: u32,
+    max_count: Option<usize>,
+    interval: Duration,
+    messages_range: (u32, u32),
+    payload_size: u32,
+    state: Mutex<State>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct TestSourceConfig {
+    interval: Option<String>,
+    max_count: Option<usize>,
+    messages_range: Option<(u32, u32)>,
+    payload_size: Option<u32>,
+}
+
+#[derive(Debug)]
+struct State {
+    current_number: usize,
+}
+
+impl TestSource {
+    pub fn new(id: u32, config: TestSourceConfig) -> Self {
+        let interval = config.interval.unwrap_or("1s".to_string());
+        let interval = humantime::Duration::from_str(&interval)
+            .unwrap_or(humantime::Duration::from_str("1s").unwrap());
+        TestSource {
+            id,
+            max_count: config.max_count,
+            interval: *interval,
+            messages_range: config.messages_range.unwrap_or((10, 50)),
+            payload_size: config.payload_size.unwrap_or(100),
+            state: Mutex::new(State { current_number: 0 }),
+        }
+    }
+
+    fn generate_messages(&self) -> Vec<ProducedMessage> {
+        let mut messages = Vec::new();
+        let mut rng = rand::rng();
+        let messages_count =
+            rng.sample(Uniform::new(self.messages_range.0, 
self.messages_range.1).unwrap());
+        for _ in 0..messages_count {
+            let record = Record {
+                title: "Hello".to_string(),
+                name: "World".to_string(),
+                age: 30,
+                text: self.generate_random_text(),
+            };
+            let message = ProducedMessage {
+                id: None,
+                headers: None,
+                payload: simd_json::to_vec(&record).unwrap(),
+            };
+            messages.push(message);
+        }
+        messages
+    }
+
+    fn generate_random_text(&self) -> String {
+        let mut rng = rand::rng();
+        let text: String = (0..self.payload_size)
+            .map(|_| rng.sample(Alphanumeric) as char)
+            .collect();
+        text
+    }
+}
+
+#[async_trait]
+impl Source for TestSource {
+    async fn open(&mut self) -> Result<(), iggy_connector_sdk::Error> {
+        info!(
+            "Initialized test source with ID {}. Interval: {:#?}, max offset: 
{:#?}, messages range: {} - {}, payload size: {}",
+            self.id,
+            self.interval,
+            self.max_count,
+            self.messages_range.0,
+            self.messages_range.1,
+            self.payload_size
+        );
+        Ok(())
+    }
+
+    async fn poll(&self) -> Result<ProducedMessages, 
iggy_connector_sdk::Error> {
+        sleep(self.interval).await;
+        let mut state = self.state.lock().await;
+        if let Some(max_count) = self.max_count {
+            if state.current_number >= max_count {
+                info!(
+                    "Reached max number of {max_count} messages for test 
source with ID {}",
+                    self.id
+                );
+                return Ok(ProducedMessages {
+                    schema: Schema::Json,
+                    messages: vec![],
+                });
+            }
+        }
+
+        let messages = self.generate_messages();
+        state.current_number += messages.len();
+        info!(
+            "Generated {} messages by test source with ID {}",
+            messages.len(),
+            self.id
+        );
+        Ok(ProducedMessages {
+            schema: Schema::Json,
+            messages,
+        })
+    }
+
+    async fn close(&mut self) {
+        info!("Test source with ID {} is shutting down", self.id);
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct Record {
+    title: String,
+    name: String,
+    age: u32,
+    text: String,
+}


Reply via email to