This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch connectors in repository https://gitbox.apache.org/repos/asf/iggy.git
commit fa098d57fd375de0c5576ad5ef9c96bd9b7ec1bc Author: spetz <[email protected]> AuthorDate: Sat May 24 21:11:54 2025 +0200 feat(repo): add connectors runtime --- Cargo.lock | 352 ++++++++++++++++++++ Cargo.toml | 21 +- core/connectors/README.md | 23 ++ core/connectors/data_producer/Cargo.toml | 40 +++ core/connectors/data_producer/src/main.rs | 146 +++++++++ core/connectors/docker-compose.yml | 38 +++ core/connectors/runtime/Cargo.toml | 47 +++ core/connectors/runtime/config.toml | 131 ++++++++ core/connectors/runtime/src/main.rs | 361 +++++++++++++++++++++ core/connectors/runtime/src/sink.rs | 344 ++++++++++++++++++++ core/connectors/runtime/src/source.rs | 299 +++++++++++++++++ core/connectors/runtime/src/transform.rs | 66 ++++ core/connectors/sdk/Cargo.toml | 46 +++ core/connectors/sdk/src/decoders/json.rs | 37 +++ core/connectors/sdk/src/decoders/mod.rs | 21 ++ core/connectors/sdk/src/decoders/raw.rs | 31 ++ core/connectors/sdk/src/decoders/text.rs | 37 +++ core/connectors/sdk/src/encoders/json.rs | 39 +++ core/connectors/sdk/src/encoders/mod.rs | 21 ++ core/connectors/sdk/src/encoders/raw.rs | 34 ++ core/connectors/sdk/src/encoders/text.rs | 37 +++ core/connectors/sdk/src/lib.rs | 239 ++++++++++++++ core/connectors/sdk/src/sink.rs | 240 ++++++++++++++ core/connectors/sdk/src/source.rs | 218 +++++++++++++ core/connectors/sdk/src/transforms/add_fields.rs | 128 ++++++++ .../connectors/sdk/src/transforms/delete_fields.rs | 61 ++++ core/connectors/sdk/src/transforms/mod.rs | 45 +++ core/connectors/sinks/quickwit_sink/Cargo.toml | 46 +++ core/connectors/sinks/quickwit_sink/src/lib.rs | 209 ++++++++++++ core/connectors/sinks/stdout_sink/Cargo.toml | 43 +++ core/connectors/sinks/stdout_sink/src/lib.rs | 88 +++++ core/connectors/sources/test_source/Cargo.toml | 47 +++ core/connectors/sources/test_source/src/lib.rs | 157 +++++++++ core/server/Cargo.toml | 3 + 34 files changed, 3692 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96dee5e0..231ee179 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -420,6 +420,12 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "arraydeque" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236" + [[package]] name = "arrayref" version = "0.3.9" @@ -595,6 +601,15 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "atomic-polyfill" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cf2bce30dfe09ef0bfaef228b9d414faaf7e563035494d7fe092dba54b300f4" +dependencies = [ + "critical-section", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1012,6 +1027,9 @@ name = "bitflags" version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +dependencies = [ + "serde", +] [[package]] name = "bitvec" @@ -1406,6 +1424,12 @@ dependencies = [ "cc", ] +[[package]] +name = "cobs" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" + [[package]] name = "colorchoice" version = "1.0.3" @@ -1451,6 +1475,25 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "config" +version = "0.15.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "595aae20e65c3be792d05818e8c63025294ac3cb7e200f11459063a352a6ef80" +dependencies = [ + "async-trait", + "convert_case 0.6.0", + "json5", + "pathdiff", + "ron", + "rust-ini", + "serde", + "serde_json", + "toml", + "winnow 0.7.10", + "yaml-rust2", +] + [[package]] name = "console-api" version = "0.8.1" @@ -1620,6 +1663,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + [[package]] name = "crossbeam" version = "0.8.4" @@ -1978,6 +2027,29 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "dlopen2" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1297103d2bbaea85724fcee6294c2d50b1081f9ad47d0f6f6f61eda65315a6" +dependencies = [ + "dlopen2_derive", + "libc", + "once_cell", + "winapi", +] + +[[package]] +name = "dlopen2_derive" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b99bf03862d7f545ebc28ddd33a665b50865f4dfd84031a393823879bd4c54" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "dlv-list" version = "0.5.2" @@ -2038,6 +2110,18 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "embedded-io" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef1a6892d9eef45c8fa6b9e0086428a2cca8491aca8f787c534a3d6d0bcb3ced" + +[[package]] +name = "embedded-io" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" + [[package]] name = "encoding_rs" version = "0.8.35" @@ -3016,6 +3100,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "halfbrown" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa2c385c6df70fd180bbb673d93039dbd2cd34e41d782600bdf6e1ca7bce39aa" +dependencies = [ + "hashbrown 0.15.3", + "serde", +] + [[package]] name = "handlebars" version = "4.5.0" @@ -3030,6 +3124,15 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "hash32" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -3056,6 +3159,15 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.3", +] + [[package]] name = "hdrhistogram" version = "7.5.4" @@ -3069,6 +3181,20 @@ dependencies = [ "num-traits", ] +[[package]] +name = "heapless" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f" +dependencies = [ + "atomic-polyfill", + "hash32", + "rustc_version", + "serde", + "spin", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.5.0" @@ -3544,6 +3670,107 @@ dependencies = [ "tracing", ] +[[package]] +name = "iggy_connector_data_producer" +version = "0.1.0" +dependencies = [ + "chrono", + "iggy", + "rand 0.9.1", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "iggy_connector_quickwit_sink" +version = "0.1.0" +dependencies = [ + "async-trait", + "dashmap", + "iggy_connector_sdk", + "once_cell", + "reqwest", + "serde", + "serde_yml", + "simd-json", + "tracing", +] + +[[package]] +name = "iggy_connector_runtime" +version = "0.1.0" +dependencies = [ + "config", + "dashmap", + "dlopen2", + "flume", + "futures", + "iggy", + "iggy_connector_sdk", + "mimalloc", + "once_cell", + "postcard", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "iggy_connector_sdk" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "dashmap", + "iggy", + "once_cell", + "postcard", + "serde", + "serde_json", + "simd-json", + "strum_macros", + "thiserror 2.0.12", + "tokio", + "tracing", + "tracing-subscriber", + "uuid", +] + +[[package]] +name = "iggy_connector_stdout_sink" +version = "0.1.0" +dependencies = [ + "async-trait", + "dashmap", + "iggy_connector_sdk", + "once_cell", + "serde", + "tracing", +] + +[[package]] +name = "iggy_connector_test_source" +version = "0.1.0" +dependencies = [ + "async-trait", + "dashmap", + "humantime", + "iggy_connector_sdk", + "once_cell", + "rand 0.9.1", + "serde", + "simd-json", + "tokio", + "tracing", +] + [[package]] name = "iggy_examples" version = "0.0.5" @@ -3809,6 +4036,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + [[package]] name = "jsonwebtoken" version = "9.3.1" @@ -3983,6 +4221,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libyml" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3302702afa434ffa30847a83305f0a69d6abd74293b6554c18ec85c7ef30c980" +dependencies = [ + "anyhow", + "version_check", +] + [[package]] name = "libz-sys" version = "1.1.22" @@ -4815,6 +5063,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pathdiff" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" + [[package]] name = "pbkdf2" version = "0.12.2" @@ -4991,6 +5245,19 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "postcard" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "170a2601f67cc9dba8edd8c4870b15f71a6a2dc196daec8c83f72b59dff628a8" +dependencies = [ + "cobs", + "embedded-io 0.4.0", + "embedded-io 0.6.1", + "heapless", + "serde", +] + [[package]] name = "potential_utf" version = "0.1.2" @@ -5445,6 +5712,26 @@ dependencies = [ "thiserror 2.0.12", ] +[[package]] +name = "ref-cast" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a0ae411dbe946a674d89546582cea4ba2bb8defac896622d6496f14c23ba5cf" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1165225c21bff1f3bbce98f5a1f889949bc902d3575308cc7b0de30b4f6d27c7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "regex" version = "1.11.1" @@ -5637,6 +5924,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "ron" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94" +dependencies = [ + "base64 0.21.7", + "bitflags 2.9.0", + "serde", + "serde_derive", +] + [[package]] name = "route-recognizer" version = "0.3.1" @@ -6104,6 +6403,21 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "serde_yml" +version = "0.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59e2dd588bf1597a252c3b920e0143eb99b0f76e4e082f4c92ce34fbc9e71ddd" +dependencies = [ + "indexmap 2.9.0", + "itoa", + "libyml", + "memchr", + "ryu", + "serde", + "version_check", +] + [[package]] name = "serial_test" version = "3.2.0" @@ -6253,6 +6567,21 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +[[package]] +name = "simd-json" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c962f626b54771990066e5435ec8331d1462576cd2d1e62f24076ae014f92112" +dependencies = [ + "getrandom 0.3.3", + "halfbrown", + "ref-cast", + "serde", + "serde_json", + "simdutf8", + "value-trait", +] + [[package]] name = "simdutf8" version = "0.1.5" @@ -7169,6 +7498,18 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "value-trait" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0508fce11ad19e0aab49ce20b6bec7f8f82902ded31df1c9fc61b90f0eb396b8" +dependencies = [ + "float-cmp", + "halfbrown", + "itoa", + "ryu", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -8036,6 +8377,17 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "yaml-rust2" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18b783b2c2789414f8bb84ca3318fc9c2d7e7be1c22907d37839a58dedb369d3" +dependencies = [ + "arraydeque", + "encoding_rs", + "hashlink", +] + [[package]] name = "yansi" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 97035f30..3a4984f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,12 @@ members = [ "core/binary_protocol", "core/cli", "core/common", + "core/connectors/data_producer", + "core/connectors/runtime", + "core/connectors/sdk", + "core/connectors/sinks/quickwit_sink", + "core/connectors/sinks/stdout_sink", + "core/connectors/sources/test_source", "core/examples", "core/integration", "core/sdk", @@ -68,16 +74,18 @@ byte-unit = { version = "5.1.6", default-features = false, features = [ ] } bytes = "1.10.1" charming = "0.4.0" -chrono = "0.4.41" +chrono = { version = "0.4.41", features = ["serde"] } clap = { version = "4.5.37", features = ["derive"] } +config = { version = "0.15.11" } comfy-table = "7.1.4" crc32fast = "1.4.2" crossbeam = "0.8.4" dashmap = "6.1.0" +derive_builder = "0.20.2" derive_more = { version = "2.0.1", features = ["full"] } derive-new = "0.7.0" dirs = "6.0.0" -derive_builder = "0.20.2" +dlopen2 = "0.7.0" enum_dispatch = "0.3.13" figlet-rs = "0.1.5" flume = "0.11.1" @@ -87,9 +95,11 @@ human-repr = "1.1.0" humantime = "2.2.0" keyring = { version = "3.6.2", features = ["sync-secret-service", "vendored"] } nonzero_lit = "0.1.2" +once_cell = "1.21.3" openssl = { version = "0.10.72", features = ["vendored"] } passterm = "=2.0.1" quinn = "0.11.8" +postcard = { version = "1.1.1", features = ["alloc"] } rand = "0.9.1" reqwest = { version = "0.12.15", default-features = false, features = [ "json", @@ -101,7 +111,9 @@ rustls = { version = "0.23.27", features = ["ring"] } serde = { version = "1.0.219", features = ["derive", "rc"] } serde_json = "1.0.140" serde_with = { version = "3.12.0", features = ["base64", "macros"] } +serde_yml = "0.0.12" serial_test = "3.2.0" +simd-json = { version = "0.15.1", features = ["serde_impl"] } sysinfo = "0.35.0" tempfile = "3.19.1" thiserror = "2.0.12" @@ -116,6 +128,7 @@ tracing-subscriber = { version = "0.3.19", default-features = false, features = "ansi", ] } uuid = { version = "1.16.0", features = [ + "v4", "v7", "fast-rng", "serde", @@ -123,6 +136,7 @@ uuid = { version = "1.16.0", features = [ ] } rust-s3 = { version = "0.35.1", features = ["default"] } strum = { version = "0.27.1", features = ["derive"] } +strum_macros = "0.27.1" aes-gcm = "0.10.3" base64 = "0.22.1" twox-hash = { version = "2.1.0", features = ["xxhash32"] } @@ -151,8 +165,9 @@ mimalloc = "0.1" console-subscriber = "0.4.1" # Path dependencies -iggy_common = { path = "core/common", version = "0.7.0" } iggy_binary_protocol = { path = "core/binary_protocol", version = "0.7.0" } +iggy_common = { path = "core/common", version = "0.7.0" } +iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.0" } iggy = { path = "core/sdk", version = "0.7.0" } server = { path = "core/server" } integration = { path = "core/integration" } diff --git a/core/connectors/README.md b/core/connectors/README.md new file mode 100644 index 00000000..0b362a55 --- /dev/null +++ b/core/connectors/README.md @@ -0,0 +1,23 @@ +# Apache Iggy Connectors + +Readme is WiP, as of now you can do the following to test the connectors: + +1. Build the project in release mode, and make sure that the plugins specified in `core/connectors/config.toml` under `path` are available. BTW you can use either of `toml`, `json` or `yaml` formats. + +2. Run `docker compose up -d` from `/core/connectors` which will start the Quickwit server to be used by an example sink connector. + +3. Set environment variable `IGGY_CONNECTORS_RUNTIME_CONFIG_PATH=core/connectors/runtime/config` pointing to the runtime configuration file. + +4. Start the Iggy server and invoke the following commands to create the example streams and topics used by the connectors. + +``` +iggy --username iggy --password iggy stream create example +iggy --username iggy --password iggy stream create qw +iggy --username iggy --password iggy topic create qw records 1 none 1d +``` + +5. Execute `cargo r --bin iggy_connector_data_producer -r` which will start the example data producer application, sending the messages to previously created `qw` stream and `records` topic. + +6. Start the connector runtime `cargo r --bin iggy_connector_runtime -r` - you should be able to browse Quickwit UI at `http://localhost:7280` with records being constantly added to the `events` index (this is part of the Quickwit sink). At the same time, you should see the new messages being added to the `example` stream and `topic1` topic by the test source connector - you can use Iggy Web UI to browse the data. + +New connector can be built simply by implementing either `Sink` or `Source` trait. Please check the existing examples under `core/connectors/sinks` and `core/connectors/sources` directories. diff --git a/core/connectors/data_producer/Cargo.toml b/core/connectors/data_producer/Cargo.toml new file mode 100644 index 00000000..59945a16 --- /dev/null +++ b/core/connectors/data_producer/Cargo.toml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_data_producer" +version = "0.1.0" +description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[dependencies] +chrono = { workspace = true } +iggy = { workspace = true } +rand = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } diff --git a/core/connectors/data_producer/src/main.rs b/core/connectors/data_producer/src/main.rs new file mode 100644 index 00000000..2607d955 --- /dev/null +++ b/core/connectors/data_producer/src/main.rs @@ -0,0 +1,146 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::{env, str::FromStr, time::Duration}; + +use chrono::{DateTime, Days, Utc}; +use iggy::prelude::{ + Client, IggyClient, IggyClientBuilder, IggyDuration, IggyError, IggyMessage, Partitioning, +}; +use rand::{ + Rng, + distr::{Alphanumeric, Uniform}, +}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tokio::time::sleep; +use tracing::info; +use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; + +const SOURCES: [&str; 6] = ["browser", "mobile", "desktop", "email", "network", "other"]; +const STATES: [&str; 5] = ["active", "inactive", "blocked", "deleted", "unknown"]; +const DOMAINS: [&str; 5] = [ + "gmail.com", + "yahoo.com", + "hotmail.com", + "outlook.com", + "aol.com", +]; + +#[tokio::main] +async fn main() -> Result<(), DataProducerError> { + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); + info!("Starting data producer..."); + let address = env::var("IGGY_ADDRESS").unwrap_or("localhost:8090".to_owned()); + let username = env::var("IGGY_USERNAME").unwrap_or("iggy".to_owned()); + let password = env::var("IGGY_PASSWORD").unwrap_or("iggy".to_owned()); + let stream = env::var("IGGY_STREAM").unwrap_or("qw".to_owned()); + let topic = env::var("IGGY_TOPIC").unwrap_or("records".to_owned()); + let client = create_client(&address, &username, &password).await?; + let mut producer = client + .producer(&stream, &topic)? + .batch_size(1000) + .send_interval(IggyDuration::from_str("5ms").unwrap()) + .partitioning(Partitioning::balanced()) + .build(); + producer.init().await?; + + let mut rng = rand::rng(); + let mut batches_count = 0; + while batches_count < 100000 { + let records_count = rng.sample(Uniform::new(500u32, 1000).unwrap()); + let messages = (0..records_count) + .map(|_| random_record()) + .flat_map(|record| serde_json::to_string(&record).ok()) + .flat_map(|payload| IggyMessage::from_str(&payload).ok()) + .collect::<Vec<_>>(); + producer.send(messages).await?; + info!("Sent {records_count} messages"); + sleep(Duration::from_millis(10)).await; + batches_count += 1; + } + + info!("Reached maximum batches count"); + Ok(()) +} + +async fn create_client( + address: &str, + username: &str, + password: &str, +) -> Result<IggyClient, IggyError> { + let connection_string = format!("iggy://{username}:{password}@{address}"); + let client = IggyClientBuilder::from_connection_string(&connection_string)?.build()?; + client.connect().await?; + Ok(client) +} + +#[derive(Debug, Serialize, Deserialize)] +struct Record { + user_id: String, + user_type: u8, + email: String, + source: String, + state: String, + created_at: DateTime<Utc>, + message: String, +} + +fn random_record() -> Record { + let mut rng = rand::rng(); + let source = + SOURCES[rng.sample(Uniform::new(0u8, SOURCES.len() as u8).unwrap()) as usize].to_owned(); + let state = + STATES[rng.sample(Uniform::new(0u8, STATES.len() as u8).unwrap()) as usize].to_owned(); + let email = format!( + "{}@{}", + random_string(rng.sample(Uniform::new(3u32, 20).unwrap()) as usize), + DOMAINS[rng.sample(Uniform::new(0u8, DOMAINS.len() as u8).unwrap()) as usize] + ); + let created_at = Utc::now() + .checked_sub_days(Days::new(rng.sample(Uniform::new(0u64, 1000).unwrap()))) + .unwrap(); + Record { + user_id: format!("user_{}", rng.sample(Uniform::new(1u32, 100).unwrap())), + user_type: rng.sample(Uniform::new(1u8, 5).unwrap()), + email, + source, + state, + message: random_string(rng.sample(Uniform::new(10u32, 100).unwrap()) as usize), + created_at, + } +} + +fn random_string(size: usize) -> String { + let mut rng = rand::rng(); + let text: String = (0..size) + .map(|_| rng.sample(Alphanumeric) as char) + .collect(); + text +} + +#[derive(Debug, Error)] +enum DataProducerError { + #[error("Iggy client error")] + IggyClient(#[from] iggy::prelude::ClientError), + #[error("Iggy error")] + IggyError(#[from] iggy::prelude::IggyError), +} diff --git a/core/connectors/docker-compose.yml b/core/connectors/docker-compose.yml new file mode 100644 index 00000000..5657e42e --- /dev/null +++ b/core/connectors/docker-compose.yml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +services: + quickwit: + image: quickwit/quickwit:edge + container_name: quickwit + restart: unless-stopped + volumes: + - quickwit:/quickwit/qwdata + ports: + - 7280:7280 + - 7281:7281 + command: run + environment: + - QW_ENABLE_OTLP_ENDPOINT=true + - QW_ENABLE_JAEGER_ENDPOINT=true + - QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER=true + - QW_ENABLE_INGEST_V2=true + - OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:7281 + - RUST_LOG=info,quickwit_actors=error,indexing_split_store=error,tantivy=error,quickwit_serve=warn,quickwit_indexing=warn,quickwit=warn + +volumes: + quickwit: diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml new file mode 100644 index 00000000..a975d0ae --- /dev/null +++ b/core/connectors/runtime/Cargo.toml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_runtime" +version = "0.1.0" +description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[dependencies] +config = { workspace = true } +dashmap = { workspace = true } +dlopen2 = { workspace = true } +flume = { workspace = true } +futures = { workspace = true } +iggy = { workspace = true } +iggy_connector_sdk = { workspace = true } +mimalloc = { workspace = true } +once_cell = { workspace = true } +postcard = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } diff --git a/core/connectors/runtime/config.toml b/core/connectors/runtime/config.toml new file mode 100644 index 00000000..618120dd --- /dev/null +++ b/core/connectors/runtime/config.toml @@ -0,0 +1,131 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[iggy] +address = "localhost:8090" +username = "iggy" +password = "iggy" +token = "secret" + +[sources.test1] +enabled = true +name = "Test source" +path = "target/release/libiggy_connector_test_source" + +[[sources.test1.streams]] +stream = "example" +topic = "topic1" +schema = "json" +batch_size = 1000 +send_interval = "5ms" + +[sources.test1.config] +interval = "100ms" +# max_count = 1000 +messages_range = [10, 50] +payload_size = 200 + +[sources.test1.transforms.add_fields] +enabled = true + +[[sources.test1.transforms.add_fields.fields]] +key = "test_field" +value.static = "hello!" + +[sinks.quickwit] +enabled = true +name = "Quickwit sink 1" +path = "target/release/libiggy_connector_quickwit_sink" + +[[sinks.quickwit.streams]] +stream = "qw" +topics = ["records"] +schema = "json" +batch_size = 1000 +poll_interval = "5ms" +consumer_group = "qw_sink_connector" + +[sinks.quickwit.transforms.add_fields] +enabled = true + +[[sinks.quickwit.transforms.add_fields.fields]] +key = "service_name" +value.static = "qw_connector" + +[[sinks.quickwit.transforms.add_fields.fields]] +key = "timestamp" +value.computed = "timestamp_millis" + +[[sinks.quickwit.ransforms.add_fields.fields]] +key = "random_id" +value.computed = "uuid_v7" + +[sinks.quickwit.transforms.delete_fields] +enabled = true +fields = ["email", "created_at"] + +[sinks.quickwit.config] +url = "http://localhost:7280" +index = """ +version: 0.9 + +index_id: events + +doc_mapping: + mode: strict + field_mappings: + - name: timestamp + type: datetime + input_formats: [unix_timestamp] + output_format: unix_timestamp_nanos + indexed: false + fast: true + fast_precision: milliseconds + - name: service_name + type: text + tokenizer: raw + fast: true + - name: random_id + type: text + tokenizer: raw + fast: true + - name: user_id + type: text + tokenizer: raw + fast: true + - name: user_type + type: u64 + fast: true + - name: source + type: text + tokenizer: default + - name: state + type: text + tokenizer: default + - name: message + type: text + tokenizer: default + + timestamp_field: timestamp + +indexing_settings: + commit_timeout_secs: 10 + +retention: + period: 7 days + schedule: daily +""" diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs new file mode 100644 index 00000000..99fe9eb6 --- /dev/null +++ b/core/connectors/runtime/src/main.rs @@ -0,0 +1,361 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use config::{Config, Environment, File}; +use dlopen2::wrapper::{Container, WrapperApi}; +use iggy::prelude::{Client, IggyClient, IggyClientBuilder, IggyConsumer, IggyError, IggyProducer}; +use iggy_connector_sdk::{ + Schema, StreamDecoder, StreamEncoder, + sink::ConsumeCallback, + source::{HandleCallback, SendCallback}, + transforms::{Transform, TransformType}, +}; +use mimalloc::MiMalloc; +use serde::{Deserialize, Serialize}; +use std::{ + collections::HashMap, + env, + sync::{Arc, atomic::AtomicU32}, +}; +use thiserror::Error; +use tracing::{error, info}; +use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt, util::SubscriberInitExt}; + +mod sink; +mod source; +mod transform; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +#[derive(WrapperApi)] +pub struct SinkApi { + open: extern "C" fn(id: u32, config_ptr: *const u8, config_len: usize) -> i32, + #[allow(clippy::too_many_arguments)] + consume: extern "C" fn( + id: u32, + topic_meta_ptr: *const u8, + topic_meta_len: usize, + messages_meta_ptr: *const u8, + messages_meta_len: usize, + messages_ptr: *const u8, + messages_len: usize, + ) -> i32, + close: extern "C" fn(id: u32) -> i32, +} + +#[derive(WrapperApi)] +struct SourceApi { + open: extern "C" fn(id: u32, config_ptr: *const u8, config_len: usize) -> i32, + handle: extern "C" fn(id: u32, callback: SendCallback) -> i32, + close: extern "C" fn(id: u32) -> i32, +} + +static PLUGIN_ID: AtomicU32 = AtomicU32::new(1); + +struct SinkConnector { + container: Container<SinkApi>, + plugins: Vec<SinkConnectorPlugin>, +} + +struct SinkConnectorPlugin { + id: u32, + consumers: Vec<SinkConnectorConsumer>, +} + +struct SinkConnectorConsumer { + batch_size: u32, + consumer: IggyConsumer, + decoder: Arc<dyn StreamDecoder>, + transforms: Vec<Arc<dyn Transform>>, +} + +struct SinkConnectorWrapper { + callback: ConsumeCallback, + plugins: Vec<SinkConnectorPlugin>, +} + +struct SinkWithPlugins { + container: Container<SinkApi>, + plugin_ids: Vec<u32>, +} + +struct SourceConnector { + container: Container<SourceApi>, + plugins: Vec<SourceConnectorPlugin>, +} + +struct SourceConnectorPlugin { + id: u32, + transforms: Vec<Arc<dyn Transform>>, + producer: Option<SourceConnectorProducer>, +} + +struct SourceConnectorProducer { + encoder: Arc<dyn StreamEncoder>, + producer: IggyProducer, +} + +struct SourceWithPlugins { + container: Container<SourceApi>, + plugin_ids: Vec<u32>, +} + +struct SourceConnectorWrapper { + callback: HandleCallback, + plugins: Vec<SourceConnectorPlugin>, +} + +#[tokio::main] +async fn main() -> Result<(), RuntimeError> { + Registry::default() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) + .init(); + let config_path = + env::var("IGGY_CONNECTORS_RUNTIME_CONFIG_PATH").unwrap_or_else(|_| "config".to_string()); + info!("Starting Iggy Connector Runtime, loading configuration from: {config_path}..."); + let builder = Config::builder() + .add_source(File::with_name(&config_path)) + .add_source(Environment::with_prefix("IGGY").separator("_")); + + let config: RuntimeConfig = builder + .build() + .expect("Failed to build config") + .try_deserialize() + .expect("Failed to deserialize config"); + + let iggy_address = config.iggy.address; + let iggy_username = config.iggy.username.expect("Iggy username must be set"); + let iggy_password = config.iggy.password.expect("Iggy password must be set"); + let consumer_client = create_client(&iggy_address, &iggy_username, &iggy_password).await?; + consumer_client.connect().await?; + let producer_client = create_client(&iggy_address, &iggy_username, &iggy_password).await?; + producer_client.connect().await?; + + let sources = source::init(config.sources, &producer_client).await?; + let sinks = sink::init(config.sinks, &consumer_client).await?; + + let mut sink_wrappers = vec![]; + let mut sink_with_plugins = HashMap::new(); + for (key, sink) in sinks { + let plugin_ids = sink.plugins.iter().map(|plugin| plugin.id).collect(); + sink_wrappers.push(SinkConnectorWrapper { + callback: sink.container.consume, + plugins: sink.plugins, + }); + sink_with_plugins.insert( + key, + SinkWithPlugins { + container: sink.container, + plugin_ids, + }, + ); + } + + let mut source_wrappers = vec![]; + let mut source_with_plugins = HashMap::new(); + for (key, source) in sources { + let plugin_ids = source.plugins.iter().map(|plugin| plugin.id).collect(); + source_wrappers.push(SourceConnectorWrapper { + callback: source.container.handle, + plugins: source.plugins, + }); + source_with_plugins.insert( + key, + SourceWithPlugins { + container: source.container, + plugin_ids, + }, + ); + } + + source::handle(source_wrappers); + sink::consume(sink_wrappers); + info!("All sources and sinks spawned."); + + #[cfg(unix)] + let (mut ctrl_c, mut sigterm) = { + use tokio::signal::unix::{SignalKind, signal}; + ( + signal(SignalKind::interrupt()).expect("Failed to create SIGINT signal"), + signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal"), + ) + }; + + #[cfg(unix)] + tokio::select! { + _ = ctrl_c.recv() => { + info!("Received SIGINT. Shutting down connectors..."); + }, + _ = sigterm.recv() => { + info!("Received SIGTERM. Shutting down connectors..."); + } + } + + for (key, source) in source_with_plugins { + for plugin_id in source.plugin_ids { + info!("Closing source connector with ID: {plugin_id} for plugin: {key}"); + source.container.close(plugin_id); + info!("Closed source connector with ID: {plugin_id} for plugin: {key}"); + } + } + + for (key, sink) in sink_with_plugins { + for plugin_id in sink.plugin_ids { + info!("Closing sink connector with ID: {plugin_id} for plugin: {key}",); + sink.container.close(plugin_id); + info!("Closed sink connector with ID: {plugin_id} for plugin: {key}"); + } + } + + producer_client.shutdown().await?; + consumer_client.shutdown().await?; + + info!("All connectors closed."); + Ok(()) +} + +async fn create_client( + address: &str, + username: &str, + password: &str, +) -> Result<IggyClient, IggyError> { + let connection_string = format!("iggy://{username}:{password}@{address}"); + let client = IggyClientBuilder::from_connection_string(&connection_string)?.build()?; + client.connect().await?; + Ok(client) +} + +#[derive(Debug, Serialize, Deserialize)] +struct Record { + title: String, + name: String, + age: u32, + text: String, +} + +#[derive(Debug, Deserialize, Serialize)] +struct RuntimeConfig { + iggy: IggyConfig, + sinks: HashMap<String, SinkConfig>, + sources: HashMap<String, SourceConfig>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct IggyConfig { + address: String, + username: Option<String>, + password: Option<String>, + token: Option<String>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct SinkConfig { + enabled: bool, + name: String, + path: String, + transforms: Option<TransformsConfig>, + streams: Vec<StreamConsumerConfig>, + config: Option<serde_json::Value>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct StreamConsumerConfig { + stream: String, + topics: Vec<String>, + schema: Schema, + batch_size: Option<u32>, + poll_interval: Option<String>, + consumer_group: Option<String>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct StreamProducerConfig { + stream: String, + topic: String, + schema: Schema, + batch_size: Option<u32>, + send_interval: Option<String>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct SourceConfig { + enabled: bool, + name: String, + path: String, + transforms: Option<TransformsConfig>, + streams: Vec<StreamProducerConfig>, + config: Option<serde_json::Value>, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +struct TransformsConfig { + #[serde(flatten)] + transforms: HashMap<TransformType, serde_json::Value>, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +struct InitialTransformConfig { + enabled: bool, +} + +#[derive(Debug, Error)] +pub enum RuntimeError { + #[error("Invalid config")] + InvalidConfig, + #[error("Invalid record")] + InvalidRecord, + #[error("Failed to serialize topic metadata")] + FailedToSerializeTopicMetadata, + #[error("Failed to serialize messages metadata")] + FailedToSerializeMessagesMetadata, + #[error("Failed to serialize raw messages")] + FailedToSerializeRawMessages, + #[error("Failed to serialize headers")] + FailedToSerializeHeaders, + #[error("Invalid transformer")] + InvalidTransformer, + #[error("Invalid transform")] + InvalidTransform, + #[error("Invalid sink")] + InvalidSink, + #[error("Connector SDK error")] + ConnectorSdkError(#[from] iggy_connector_sdk::Error), + #[error("Iggy client error")] + IggyClient(#[from] iggy::prelude::ClientError), + #[error("Iggy error")] + IggyError(#[from] iggy::prelude::IggyError), +} + +const ALLOWED_PLUGIN_EXTENSIONS: [&str; 3] = ["so", "dylib", "dll"]; + +pub fn resolve_plugin_path(path: &str) -> String { + let extension = path.split('.').next_back().unwrap_or_default(); + if ALLOWED_PLUGIN_EXTENSIONS.contains(&extension) { + path.to_string() + } else { + let os_extension = match std::env::consts::OS { + "windows" => "dll", + "macos" => "dylib", + _ => "so", + }; + + format!("{path}.{os_extension}") + } +} diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs new file mode 100644 index 00000000..fd73fd4a --- /dev/null +++ b/core/connectors/runtime/src/sink.rs @@ -0,0 +1,344 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use dlopen2::wrapper::Container; +use futures::StreamExt; +use iggy::prelude::{ + AutoCommit, AutoCommitWhen, IggyClient, IggyConsumer, IggyDuration, IggyMessage, + PollingStrategy, +}; +use iggy_connector_sdk::{ + DecodedMessage, MessagesMetadata, RawMessage, RawMessages, ReceivedMessage, StreamDecoder, + TopicMetadata, sink::ConsumeCallback, transforms::Transform, +}; +use std::{ + collections::HashMap, + str::FromStr, + sync::{Arc, atomic::Ordering}, + time::Instant, +}; +use tracing::{error, info, warn}; + +use crate::{ + PLUGIN_ID, RuntimeError, SinkApi, SinkConfig, SinkConnector, SinkConnectorConsumer, + SinkConnectorPlugin, SinkConnectorWrapper, resolve_plugin_path, transform, +}; + +pub async fn init( + sink_configs: HashMap<String, SinkConfig>, + iggy_client: &IggyClient, +) -> Result<HashMap<String, SinkConnector>, RuntimeError> { + let mut sink_connectors: HashMap<String, SinkConnector> = HashMap::new(); + for (key, config) in sink_configs { + let name = config.name; + if !config.enabled { + warn!("Sink: {name} is disabled ({key})"); + continue; + } + + let plugin_id = PLUGIN_ID.load(Ordering::Relaxed); + let path = resolve_plugin_path(&config.path); + info!("Initializing sink container with name: {name} ({key}), plugin: {path}",); + if let Some(container) = sink_connectors.get_mut(&path) { + info!("Sink container for plugin: {path} is already loaded.",); + init_sink( + &container.container, + &config.config.unwrap_or_default(), + plugin_id, + ); + container.plugins.push(SinkConnectorPlugin { + id: plugin_id, + consumers: vec![], + }); + } else { + let container: Container<SinkApi> = + unsafe { Container::load(&path).expect("Failed to load sink container") }; + info!("Sink container for plugin: {path} loaded successfully.",); + init_sink(&container, &config.config.unwrap_or_default(), plugin_id); + sink_connectors.insert( + path.to_owned(), + SinkConnector { + container, + plugins: vec![SinkConnectorPlugin { + id: plugin_id, + consumers: vec![], + }], + }, + ); + } + + info!( + "Sink container with name: {name} ({key}), initialized successfully with ID: {plugin_id}." + ); + PLUGIN_ID.fetch_add(1, Ordering::Relaxed); + + let transforms = if let Some(transforms_config) = config.transforms { + let transforms = transform::load(transforms_config).expect("Failed to load transforms"); + let types = transforms + .iter() + .map(|t| t.r#type().into()) + .collect::<Vec<&'static str>>() + .join(", "); + info!("Enabled transforms for sink: {name} ({key}): {types}",); + transforms + } else { + vec![] + }; + + let connector = sink_connectors + .get_mut(&path) + .expect("Failed to get sink connector"); + let plugin = connector + .plugins + .iter_mut() + .find(|p| p.id == plugin_id) + .expect("Failed to get sink plugin"); + + for stream in config.streams { + let poll_interval = + IggyDuration::from_str(&stream.poll_interval.unwrap_or("5ms".to_owned())) + .expect("Invalid poll interval"); + let consumer_group = stream + .consumer_group + .unwrap_or(format!("iggy-connect-{key}")); + let batch_size = stream.batch_size.unwrap_or(1000); + for topic in stream.topics { + let mut consumer = iggy_client + .consumer_group(&consumer_group, &stream.stream, &topic)? + .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + .create_consumer_group_if_not_exists() + .auto_join_consumer_group() + .polling_strategy(PollingStrategy::next()) + .poll_interval(poll_interval) + .batch_size(batch_size) + .build(); + + consumer.init().await?; + plugin.consumers.push(SinkConnectorConsumer { + consumer, + decoder: stream.schema.decoder(), + batch_size, + transforms: transforms.clone(), + }); + } + } + } + + Ok(sink_connectors) +} + +pub fn consume(sinks: Vec<SinkConnectorWrapper>) { + for sink in sinks { + for plugin in sink.plugins { + info!("Starting consume for sink with ID: {}...", plugin.id); + for consumer in plugin.consumers { + tokio::spawn(async move { + consume_messages( + plugin.id, + consumer.decoder, + consumer.batch_size, + sink.callback, + consumer.transforms, + consumer.consumer, + ) + .await + .expect("Failed to consume messages") + }); + } + info!( + "Consume for sink with ID: {} started successfully.", + plugin.id + ); + } + } +} + +async fn consume_messages( + plugin_id: u32, + decoder: Arc<dyn StreamDecoder>, + batch_size: u32, + consume: ConsumeCallback, + transforms: Vec<Arc<dyn Transform>>, + mut consumer: IggyConsumer, +) -> Result<(), RuntimeError> { + info!("Started consuming messages for plugin with ID: {plugin_id}"); + let batch_size = batch_size as usize; + + let mut batch = Vec::with_capacity(batch_size); + let topic_metadata = TopicMetadata { + stream: consumer.stream().to_string(), + topic: consumer.topic().to_string(), + }; + + while let Some(message) = consumer.next().await { + let Ok(message) = message else { + error!("Failed to receive message."); + continue; + }; + + let partition_id = message.partition_id; + let current_offset = message.current_offset; + let message_offset = message.message.header.offset; + batch.push(message.message); + if current_offset != message_offset && batch.len() < batch_size { + continue; + } + + let messages = std::mem::take(&mut batch); + let messages_count = messages.len(); + let messages_metadata = MessagesMetadata { + partition_id, + current_offset, + schema: decoder.schema(), + }; + info!("Processing {messages_count} messages"); + let start = Instant::now(); + if let Err(error) = process_messages( + plugin_id, + messages_metadata, + &topic_metadata, + messages, + &consume, + &transforms, + &decoder, + ) + .await + { + error!("Failed to process {messages_count} messages. {error}"); + continue; + } + + let elapsed = start.elapsed(); + info!("Consumed {messages_count} messages in {:#?}", elapsed); + } + info!("Stopped consuming messages for plugin with ID: {plugin_id}"); + Ok(()) +} + +async fn process_messages( + plugin_id: u32, + messages_metadata: MessagesMetadata, + topic_metadata: &TopicMetadata, + messages: Vec<IggyMessage>, + consume: &ConsumeCallback, + transforms: &Vec<Arc<dyn Transform>>, + decoder: &Arc<dyn StreamDecoder>, +) -> Result<(), RuntimeError> { + let messages = messages.into_iter().map(|message| ReceivedMessage { + id: message.header.id, + offset: message.header.offset, + headers: message.user_headers_map().unwrap_or_default(), + payload: message.payload.into(), + }); + + let count = messages.len(); + let decoded_messages = messages.into_iter().flat_map(|message| { + let Ok(payload) = decoder.decode(message.payload) else { + return None; + }; + + Some(DecodedMessage { + id: Some(message.id), + offset: Some(message.offset), + headers: message.headers, + payload, + }) + }); + let mut raw_messages = Vec::with_capacity(count); + for message in decoded_messages { + let mut current_message = Some(message); + for transform in transforms.iter() { + let Some(message) = current_message else { + break; + }; + + current_message = transform.transform(topic_metadata, message)?; + } + + // The transform may return no message based on some conditions + let Some(message) = current_message else { + continue; + }; + + let Some(offset) = message.offset else { + error!("Offset should be present."); + continue; + }; + + let Ok(payload) = message.payload.try_into_vec() else { + error!("Failed to get raw payload."); + continue; + }; + + let headers: Result<Vec<u8>, RuntimeError> = if let Some(headers) = message.headers { + Ok(postcard::to_allocvec(&headers).map_err(|error| { + error!("Failed to serialize headers. {error}"); + RuntimeError::FailedToSerializeHeaders + })?) + } else { + Ok(vec![]) + }; + + let Ok(headers) = headers else { + error!("Failed to serialize headers."); + continue; + }; + + raw_messages.push(RawMessage { + offset, + headers, + payload, + }); + } + + let topic_meta = postcard::to_allocvec(topic_metadata).map_err(|error| { + error!("Failed to serialize topic metadata. {error}"); + RuntimeError::FailedToSerializeTopicMetadata + })?; + + let messages_meta = postcard::to_allocvec(&messages_metadata).map_err(|error| { + error!("Failed to serialize messages metadata. {error}"); + RuntimeError::FailedToSerializeMessagesMetadata + })?; + + let messages = postcard::to_allocvec(&RawMessages { + schema: decoder.schema(), + messages: raw_messages, + }) + .map_err(|error| { + error!("Failed to serialize raw messages. {error}"); + RuntimeError::FailedToSerializeRawMessages + })?; + + (consume)( + plugin_id, + topic_meta.as_ptr(), + topic_meta.len(), + messages_meta.as_ptr(), + messages_meta.len(), + messages.as_ptr(), + messages.len(), + ); + + Ok(()) +} + +fn init_sink(container: &Container<SinkApi>, config: &serde_json::Value, id: u32) { + let config = serde_json::to_string(config).expect("Invalid sink config."); + (container.open)(id, config.as_ptr(), config.len()); +} diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs new file mode 100644 index 00000000..ac5be012 --- /dev/null +++ b/core/connectors/runtime/src/source.rs @@ -0,0 +1,299 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use dashmap::DashMap; +use dlopen2::wrapper::Container; +use flume::{Receiver, Sender}; +use iggy::prelude::{HeaderKey, HeaderValue, IggyClient, IggyDuration, IggyError, IggyMessage}; +use iggy_connector_sdk::{ + DecodedMessage, Error, ProducedMessages, StreamEncoder, TopicMetadata, transforms::Transform, +}; +use once_cell::sync::Lazy; +use std::{ + collections::HashMap, + str::FromStr, + sync::{Arc, atomic::Ordering}, +}; +use tracing::{debug, error, info, warn}; + +use crate::{ + PLUGIN_ID, RuntimeError, SourceApi, SourceConfig, SourceConnector, SourceConnectorPlugin, + SourceConnectorProducer, SourceConnectorWrapper, resolve_plugin_path, transform, +}; + +pub static SOURCE_SENDERS: Lazy<DashMap<u32, Sender<ProducedMessages>>> = Lazy::new(DashMap::new); + +pub async fn init( + source_configs: HashMap<String, SourceConfig>, + iggy_client: &IggyClient, +) -> Result<HashMap<String, SourceConnector>, RuntimeError> { + let mut source_connectors: HashMap<String, SourceConnector> = HashMap::new(); + for (key, config) in source_configs { + let name = config.name; + if !config.enabled { + warn!("Source: {name} is disabled ({key})"); + continue; + } + + let plugin_id = PLUGIN_ID.load(Ordering::Relaxed); + let path = resolve_plugin_path(&config.path); + info!("Initializing source container with name: {name} ({key}), plugin: {path}",); + if let Some(container) = source_connectors.get_mut(&path) { + info!("Source container for plugin: {path} is already loaded.",); + init_source( + &container.container, + &config.config.unwrap_or_default(), + plugin_id, + ); + container.plugins.push(SourceConnectorPlugin { + id: plugin_id, + producer: None, + transforms: vec![], + }); + } else { + let container: Container<SourceApi> = + unsafe { Container::load(&path).expect("Failed to load source container") }; + info!("Source container for plugin: {path} loaded successfully.",); + init_source(&container, &config.config.unwrap_or_default(), plugin_id); + source_connectors.insert( + path.to_owned(), + SourceConnector { + container, + plugins: vec![SourceConnectorPlugin { + id: plugin_id, + producer: None, + transforms: vec![], + }], + }, + ); + } + + info!( + "Source container with name: {name} ({key}), initialized successfully with ID: {plugin_id}." + ); + PLUGIN_ID.fetch_add(1, Ordering::Relaxed); + + let transforms = if let Some(transforms_config) = config.transforms { + let transforms = transform::load(transforms_config).expect("Failed to load transforms"); + let types = transforms + .iter() + .map(|t| t.r#type().into()) + .collect::<Vec<&'static str>>() + .join(", "); + info!("Enabled transforms for source: {name} ({key}): {types}",); + transforms + } else { + vec![] + }; + + let connector = source_connectors + .get_mut(&path) + .expect("Failed to get source connector"); + let plugin = connector + .plugins + .iter_mut() + .find(|p| p.id == plugin_id) + .expect("Failed to get source plugin"); + + for stream in config.streams { + let send_interval = + IggyDuration::from_str(&stream.send_interval.unwrap_or("5ms".to_owned())) + .expect("Invalid send interval"); + let batch_size = stream.batch_size.unwrap_or(1000); + let mut producer = iggy_client + .producer(&stream.stream, &stream.topic)? + .send_interval(send_interval) + .batch_size(batch_size) + .build(); + + producer.init().await?; + plugin.producer = Some(SourceConnectorProducer { + producer, + encoder: stream.schema.encoder(), + }); + plugin.transforms = transforms.clone(); + } + } + + Ok(source_connectors) +} + +pub fn handle(sources: Vec<SourceConnectorWrapper>) { + for source in sources { + for plugin in source.plugins { + let plugin_id = plugin.id; + info!("Starting handler for source with ID: {plugin_id}..."); + let handle = source.callback; + tokio::task::spawn_blocking(move || { + handle(plugin_id, handle_produced_messages); + }); + info!("Handler for source with ID: {plugin_id} started successfully."); + + let (sender, receiver): (Sender<ProducedMessages>, Receiver<ProducedMessages>) = + flume::unbounded(); + SOURCE_SENDERS.insert(plugin_id, sender); + tokio::spawn(async move { + info!("Source #{plugin_id} started"); + let producer = &plugin.producer.expect("Producer not initialized"); + let encoder = producer.encoder.clone(); + let producer = &producer.producer; + let mut number = 1; + + let topic_metadata = TopicMetadata { + stream: producer.stream().to_string(), + topic: producer.topic().to_string(), + }; + + while let Ok(received_messages) = receiver.recv_async().await { + let count = received_messages.messages.len(); + info!("[Source #{plugin_id} received {count} messages",); + let schema = received_messages.schema; + let mut messages: Vec<DecodedMessage> = Vec::with_capacity(count); + for message in received_messages.messages { + let Ok(payload) = schema.try_into_payload(message.payload) else { + error!( + "Failed to decode message payload with schema: {}", + received_messages.schema + ); + continue; + }; + + debug!( + "[Source #{plugin_id}] Message: {number} | Schema: {schema} | Payload: {payload}" + ); + messages.push(DecodedMessage { + id: message.id, + offset: None, + headers: message.headers, + payload, + }); + number += 1; + } + + let Ok(iggy_messages) = + process_messages(&encoder, &topic_metadata, messages, &plugin.transforms) + else { + error!( + "Failed to process {count} messages before sending them to stream: {}, topic: {}.", + producer.stream(), + producer.topic() + ); + continue; + }; + + if let Err(error) = producer.send(iggy_messages).await { + error!( + "Failed to send {count} messages to stream: {}, topic: {}. {error}", + producer.stream(), + producer.topic(), + ); + continue; + } + + info!( + "Sent {count} messages to stream: {}, topic: {}", + producer.stream(), + producer.topic() + ); + } + }); + } + } +} + +fn process_messages( + encoder: &Arc<dyn StreamEncoder>, + topic_metadata: &TopicMetadata, + messages: Vec<DecodedMessage>, + transforms: &Vec<Arc<dyn Transform>>, +) -> Result<Vec<IggyMessage>, Error> { + let mut iggy_messages = Vec::with_capacity(messages.len()); + for message in messages { + let mut current_message = Some(message); + for transform in transforms.iter() { + let Some(message) = current_message else { + break; + }; + + current_message = transform.transform(topic_metadata, message)?; + } + + // The transform may return no message based on some conditions + let Some(message) = current_message else { + continue; + }; + + let Ok(payload) = encoder.encode(message.payload) else { + error!("Failed to encode message payload"); + continue; + }; + + let Ok(iggy_message) = build_iggy_message(payload, message.id, message.headers) else { + error!("Failed to build Iggy message"); + continue; + }; + + iggy_messages.push(iggy_message); + } + Ok(iggy_messages) +} + +fn init_source(container: &Container<SourceApi>, config: &serde_json::Value, id: u32) { + let config = serde_json::to_string(config).expect("Invalid source config."); + (container.open)(id, config.as_ptr(), config.len()); +} + +extern "C" fn handle_produced_messages( + plugin_id: u32, + messages_ptr: *const u8, + messages_len: usize, +) { + unsafe { + if let Some(sender) = SOURCE_SENDERS.get(&plugin_id) { + let messages = std::slice::from_raw_parts(messages_ptr, messages_len); + let Ok(messages) = postcard::from_bytes::<ProducedMessages>(messages) else { + eprintln!("Failed to deserialize produced messages."); + return; + }; + let _ = sender.send(messages); + } + } +} + +fn build_iggy_message( + payload: Vec<u8>, + id: Option<u128>, + headers: Option<HashMap<HeaderKey, HeaderValue>>, +) -> Result<IggyMessage, IggyError> { + match (id, headers) { + (Some(id), Some(h)) => IggyMessage::builder() + .payload(payload.into()) + .id(id) + .user_headers(h) + .build(), + (Some(id), None) => IggyMessage::builder() + .payload(payload.into()) + .id(id) + .build(), + (None, Some(h)) => IggyMessage::builder() + .payload(payload.into()) + .user_headers(h) + .build(), + (None, None) => IggyMessage::builder().payload(payload.into()).build(), + } +} diff --git a/core/connectors/runtime/src/transform.rs b/core/connectors/runtime/src/transform.rs new file mode 100644 index 00000000..2417fbfa --- /dev/null +++ b/core/connectors/runtime/src/transform.rs @@ -0,0 +1,66 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::sync::Arc; + +use iggy_connector_sdk::transforms::{ + Transform, TransformType, add_fields::AddFields, delete_fields::DeleteFields, +}; +use serde::de::DeserializeOwned; +use tracing::error; + +use crate::{InitialTransformConfig, RuntimeError, TransformsConfig}; + +pub fn load(config: TransformsConfig) -> Result<Vec<Arc<dyn Transform>>, RuntimeError> { + let mut transforms: Vec<Arc<dyn Transform>> = vec![]; + let mut types = vec![]; + for (r#type, transform_config) in config.transforms { + let initial_config = + serde_json::from_value::<InitialTransformConfig>(transform_config.clone()) + .unwrap_or_default(); + if !initial_config.enabled { + continue; + } + + let transform = load_transform(r#type, transform_config)?; + types.push(r#type); + transforms.push(transform); + } + + Ok(transforms) +} + +fn load_transform( + r#type: TransformType, + config: serde_json::Value, +) -> Result<Arc<dyn Transform>, RuntimeError> { + Ok(match r#type { + TransformType::AddFields => Arc::new(AddFields::new(as_config(r#type, config)?)), + TransformType::DeleteFields => Arc::new(DeleteFields::new(as_config(r#type, config)?)), + }) +} + +fn as_config<T: DeserializeOwned>( + r#type: TransformType, + config: serde_json::Value, +) -> Result<T, RuntimeError> { + serde_json::from_value::<T>(config).map_err(|error| { + error!("Failed to deserialize config for transform: {type}. {error}"); + RuntimeError::InvalidConfig + }) +} diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml new file mode 100644 index 00000000..7c96ed3c --- /dev/null +++ b/core/connectors/sdk/Cargo.toml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_sdk" +version = "0.1.0" +description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[dependencies] +async-trait = { workspace = true } +chrono = { workspace = true } +dashmap = { workspace = true } +iggy = { workspace = true } +once_cell = { workspace = true } +postcard = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +strum_macros = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +uuid = { workspace = true } diff --git a/core/connectors/sdk/src/decoders/json.rs b/core/connectors/sdk/src/decoders/json.rs new file mode 100644 index 00000000..f2bda418 --- /dev/null +++ b/core/connectors/sdk/src/decoders/json.rs @@ -0,0 +1,37 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{Error, Payload, Schema, StreamDecoder}; +use tracing::error; + +pub struct JsonStreamDecoder; + +impl StreamDecoder for JsonStreamDecoder { + fn schema(&self) -> Schema { + Schema::Json + } + + fn decode(&self, mut payload: Vec<u8>) -> Result<Payload, Error> { + Ok(Payload::Json( + simd_json::to_owned_value(&mut payload).map_err(|error| { + error!("Failed to decode JSON payload: {error}"); + Error::CannotDecode(self.schema()) + })?, + )) + } +} diff --git a/core/connectors/sdk/src/decoders/mod.rs b/core/connectors/sdk/src/decoders/mod.rs new file mode 100644 index 00000000..e1b7e1b7 --- /dev/null +++ b/core/connectors/sdk/src/decoders/mod.rs @@ -0,0 +1,21 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub mod json; +pub mod raw; +pub mod text; diff --git a/core/connectors/sdk/src/decoders/raw.rs b/core/connectors/sdk/src/decoders/raw.rs new file mode 100644 index 00000000..efbddc12 --- /dev/null +++ b/core/connectors/sdk/src/decoders/raw.rs @@ -0,0 +1,31 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{Error, Payload, Schema, StreamDecoder}; + +pub struct RawStreamDecoder; + +impl StreamDecoder for RawStreamDecoder { + fn schema(&self) -> Schema { + Schema::Raw + } + + fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error> { + Ok(Payload::Raw(payload)) + } +} diff --git a/core/connectors/sdk/src/decoders/text.rs b/core/connectors/sdk/src/decoders/text.rs new file mode 100644 index 00000000..6d7722cb --- /dev/null +++ b/core/connectors/sdk/src/decoders/text.rs @@ -0,0 +1,37 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{Error, Payload, Schema, StreamDecoder}; +use tracing::error; + +pub struct TextStreamDecoder; + +impl StreamDecoder for TextStreamDecoder { + fn schema(&self) -> Schema { + Schema::Text + } + + fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error> { + Ok(Payload::Text(String::from_utf8(payload).map_err( + |error| { + error!("Failed to decode text payload: {error}"); + Error::CannotDecode(self.schema()) + }, + )?)) + } +} diff --git a/core/connectors/sdk/src/encoders/json.rs b/core/connectors/sdk/src/encoders/json.rs new file mode 100644 index 00000000..03f4e4f0 --- /dev/null +++ b/core/connectors/sdk/src/encoders/json.rs @@ -0,0 +1,39 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{Error, Payload, Schema, StreamEncoder}; + +pub struct JsonStreamEncoder; + +impl StreamEncoder for JsonStreamEncoder { + fn schema(&self) -> Schema { + Schema::Json + } + + fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error> { + match payload { + Payload::Text(value) => { + Ok(simd_json::to_vec(&value).map_err(|_| Error::InvalidJsonPayload)?) + } + Payload::Json(value) => { + Ok(simd_json::to_vec(&value).map_err(|_| Error::InvalidJsonPayload)?) + } + _ => Err(Error::InvalidPayloadType), + } + } +} diff --git a/core/connectors/sdk/src/encoders/mod.rs b/core/connectors/sdk/src/encoders/mod.rs new file mode 100644 index 00000000..e1b7e1b7 --- /dev/null +++ b/core/connectors/sdk/src/encoders/mod.rs @@ -0,0 +1,21 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub mod json; +pub mod raw; +pub mod text; diff --git a/core/connectors/sdk/src/encoders/raw.rs b/core/connectors/sdk/src/encoders/raw.rs new file mode 100644 index 00000000..68b3b9bb --- /dev/null +++ b/core/connectors/sdk/src/encoders/raw.rs @@ -0,0 +1,34 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{Error, Payload, Schema, StreamEncoder}; + +pub struct RawStreamEncoder; + +impl StreamEncoder for RawStreamEncoder { + fn schema(&self) -> Schema { + Schema::Raw + } + + fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error> { + match payload { + Payload::Raw(value) => Ok(value), + _ => Err(Error::InvalidPayloadType), + } + } +} diff --git a/core/connectors/sdk/src/encoders/text.rs b/core/connectors/sdk/src/encoders/text.rs new file mode 100644 index 00000000..6fb2d4e9 --- /dev/null +++ b/core/connectors/sdk/src/encoders/text.rs @@ -0,0 +1,37 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{Error, Payload, Schema, StreamEncoder}; + +pub struct TextStreamEncoder; + +impl StreamEncoder for TextStreamEncoder { + fn schema(&self) -> Schema { + Schema::Text + } + + fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error> { + match payload { + Payload::Text(value) => Ok(value.into_bytes()), + Payload::Json(value) => { + Ok(simd_json::to_vec(&value).map_err(|_| Error::InvalidJsonPayload)?) + } + _ => Err(Error::InvalidPayloadType), + } + } +} diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs new file mode 100644 index 00000000..749c666e --- /dev/null +++ b/core/connectors/sdk/src/lib.rs @@ -0,0 +1,239 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use async_trait::async_trait; +use decoders::{json::JsonStreamDecoder, raw::RawStreamDecoder, text::TextStreamDecoder}; +use encoders::{json::JsonStreamEncoder, raw::RawStreamEncoder, text::TextStreamEncoder}; +use iggy::prelude::{HeaderKey, HeaderValue}; +use once_cell::sync::OnceCell; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, sync::Arc}; +use strum_macros::{Display, IntoStaticStr}; +use thiserror::Error; +use tokio::runtime::Runtime; + +pub mod decoders; +pub mod encoders; +pub mod sink; +pub mod source; +pub mod transforms; + +static RUNTIME: OnceCell<Runtime> = OnceCell::new(); + +pub fn get_runtime() -> &'static Runtime { + RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio runtime")) +} + +#[async_trait] +pub trait Source: Send + Sync { + async fn open(&mut self) -> Result<(), Error>; + async fn poll(&self) -> Result<ProducedMessages, Error>; + async fn close(&mut self); +} + +#[async_trait] +pub trait Sink: Send + Sync { + async fn open(&mut self) -> Result<(), Error>; + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error>; + async fn close(&mut self); +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum Payload { + Json(simd_json::OwnedValue), + Raw(Vec<u8>), + Text(String), +} + +impl Payload { + pub fn try_into_vec(self) -> Result<Vec<u8>, Error> { + match self { + Payload::Json(value) => { + Ok(simd_json::to_vec(&value).map_err(|_| Error::InvalidJsonPayload)?) + } + Payload::Raw(value) => Ok(value), + Payload::Text(text) => Ok(text.into_bytes()), + } + } +} + +impl std::fmt::Display for Payload { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Payload::Json(value) => write!( + f, + "Json({})", + simd_json::to_string_pretty(value).unwrap_or_default() + ), + Payload::Raw(value) => write!(f, "Raw({:#?})", value), + Payload::Text(text) => write!(f, "Text({})", text), + } + } +} + +#[repr(C)] +#[derive( + Debug, Copy, Clone, Eq, Hash, PartialEq, Serialize, Deserialize, Display, IntoStaticStr, +)] +#[serde(rename_all = "snake_case")] +pub enum Schema { + #[strum(to_string = "json")] + Json, + #[strum(to_string = "raw")] + Raw, + #[strum(to_string = "text")] + Text, +} + +impl Schema { + pub fn try_into_payload(self, mut value: Vec<u8>) -> Result<Payload, Error> { + match self { + Schema::Json => Ok(Payload::Json( + simd_json::to_owned_value(&mut value).map_err(|_| Error::InvalidJsonPayload)?, + )), + Schema::Raw => Ok(Payload::Raw(value)), + Schema::Text => Ok(Payload::Text( + String::from_utf8(value).map_err(|_| Error::InvalidTextPayload)?, + )), + } + } + + pub fn decoder(self) -> Arc<dyn StreamDecoder> { + match self { + Schema::Json => Arc::new(JsonStreamDecoder), + Schema::Raw => Arc::new(RawStreamDecoder), + Schema::Text => Arc::new(TextStreamDecoder), + } + } + + pub fn encoder(self) -> Arc<dyn StreamEncoder> { + match self { + Schema::Json => Arc::new(JsonStreamEncoder), + Schema::Raw => Arc::new(RawStreamEncoder), + Schema::Text => Arc::new(TextStreamEncoder), + } + } +} + +#[repr(C)] +#[derive(Debug, Serialize, Deserialize)] +pub struct TopicMetadata { + pub stream: String, + pub topic: String, +} + +#[repr(C)] +#[derive(Debug, Serialize, Deserialize)] +pub struct MessagesMetadata { + pub partition_id: u32, + pub current_offset: u64, + pub schema: Schema, +} + +#[repr(C)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReceivedMessage { + pub id: u128, + pub offset: u64, + pub headers: Option<HashMap<HeaderKey, HeaderValue>>, + pub payload: Vec<u8>, +} + +#[repr(C)] +#[derive(Debug, Serialize, Deserialize)] +pub struct ProducedMessages { + pub schema: Schema, + pub messages: Vec<ProducedMessage>, +} + +#[repr(C)] +#[derive(Debug, Serialize, Deserialize)] +pub struct ProducedMessage { + pub id: Option<u128>, + pub headers: Option<HashMap<HeaderKey, HeaderValue>>, + pub payload: Vec<u8>, +} + +#[repr(C)] +#[derive(Debug, Serialize, Deserialize)] +pub struct DecodedMessage { + pub id: Option<u128>, + pub offset: Option<u64>, + pub headers: Option<HashMap<HeaderKey, HeaderValue>>, + pub payload: Payload, +} + +#[repr(C)] +#[derive(Debug, Serialize, Deserialize)] +pub struct RawMessages { + pub schema: Schema, + pub messages: Vec<RawMessage>, +} + +#[repr(C)] +#[derive(Debug, Serialize, Deserialize)] +pub struct RawMessage { + pub offset: u64, + pub headers: Vec<u8>, + pub payload: Vec<u8>, +} + +#[repr(C)] +#[derive(Debug, Serialize, Deserialize)] +pub struct ConsumedMessage { + pub offset: u64, + pub headers: Option<HashMap<HeaderKey, HeaderValue>>, + pub payload: Payload, +} + +pub trait StreamDecoder: Send + Sync { + fn schema(&self) -> Schema; + fn decode(&self, payload: Vec<u8>) -> Result<Payload, Error>; +} + +pub trait StreamEncoder: Send + Sync { + fn schema(&self) -> Schema; + fn encode(&self, payload: Payload) -> Result<Vec<u8>, Error>; +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Error)] +pub enum Error { + #[error("Invalid config")] + InvalidConfig, + #[error("Invalid record")] + InvalidRecord, + #[error("Invalid transformer")] + InvalidTransformer, + #[error("HTTP request failed: {0}")] + HttpRequestFailed(String), + #[error("Init error: {0}")] + InitError(String), + #[error("Invalid payload type")] + InvalidPayloadType, + #[error("Invalid JSON payload.")] + InvalidJsonPayload, + #[error("Invalid text payload.")] + InvalidTextPayload, + #[error("Cannot decode schema {0}")] + CannotDecode(Schema), +} diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs new file mode 100644 index 00000000..bd836ac6 --- /dev/null +++ b/core/connectors/sdk/src/sink.rs @@ -0,0 +1,240 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use serde::de::DeserializeOwned; +use tokio::sync::watch; +use tracing::{error, info}; +use tracing_subscriber::fmt; + +use crate::{ConsumedMessage, MessagesMetadata, RawMessages, Sink, TopicMetadata, get_runtime}; + +pub type ConsumeCallback = extern "C" fn( + plugin_id: u32, + topic_meta_ptr: *const u8, + topic_meta_len: usize, + messages_meta_ptr: *const u8, + messages_meta_len: usize, + messages_ptr: *const u8, + messages_len: usize, +) -> i32; + +#[derive(Debug)] +pub struct SinkContainer<T: Sink + std::fmt::Debug> { + id: u32, + sink: Option<T>, + shutdown: Option<watch::Sender<()>>, +} + +impl<T: Sink + std::fmt::Debug> SinkContainer<T> { + pub const fn new(id: u32) -> Self { + Self { + id, + sink: None, + shutdown: None, + } + } + + /// # Safety + /// This is safe + pub unsafe fn open<F, C>( + &mut self, + id: u32, + config_ptr: *const u8, + config_len: usize, + factory: F, + ) -> i32 + where + F: FnOnce(u32, C) -> T, + C: DeserializeOwned, + { + unsafe { + _ = fmt::try_init(); + let slice = std::slice::from_raw_parts(config_ptr, config_len); + let Ok(config_str) = std::str::from_utf8(slice) else { + return -1; + }; + + let Ok(config) = serde_json::from_str(config_str) else { + return -1; + }; + + let mut sink = factory(id, config); + let runtime = get_runtime(); + let result = runtime.block_on(sink.open()); + self.id = id; + self.sink = Some(sink); + if result.is_ok() { 0 } else { 1 } + } + } + + /// # Safety + /// This is safe + pub unsafe fn close(&mut self) -> i32 { + let Some(mut sink) = self.sink.take() else { + error!( + "Sink with ID: {} is not initialized - cannot close.", + self.id + ); + return -1; + }; + + info!("Closing sink with ID: {}...", self.id); + if let Some(tx) = self.shutdown.take() { + let _ = tx.send(()); + } + + let runtime = get_runtime(); + runtime.block_on(sink.close()); + info!("Closed sink with ID: {}", self.id); + 0 + } + + /// # Safety + /// This is safe + pub unsafe fn consume( + &self, + topic_meta_ptr: *const u8, + topic_meta_len: usize, + messages_meta_ptr: *const u8, + messages_meta_len: usize, + messages_ptr: *const u8, + messages_len: usize, + ) -> i32 { + unsafe { + let Some(sink) = self.sink.as_ref() else { + error!( + "Sink with ID: {} is not initialized - cannot consume.", + self.id + ); + return -1; + }; + + let topic_meta_slice = + std::slice::from_raw_parts(topic_meta_ptr, topic_meta_len).to_vec(); + let messages_meta_slice = + std::slice::from_raw_parts(messages_meta_ptr, messages_meta_len).to_vec(); + let messages_slice = std::slice::from_raw_parts(messages_ptr, messages_len).to_vec(); + + let Ok(topic_metadata) = postcard::from_bytes::<TopicMetadata>(&topic_meta_slice) + else { + error!("Failed to decode topic metadata"); + return -1; + }; + + let Ok(messages_metadata) = + postcard::from_bytes::<MessagesMetadata>(&messages_meta_slice) + else { + error!("Failed to decode messages metadata"); + return -1; + }; + + let Ok(raw_messages) = postcard::from_bytes::<RawMessages>(&messages_slice) else { + error!("Failed to decode raw messages"); + return -1; + }; + + let mut messages = Vec::with_capacity(raw_messages.messages.len()); + for message in raw_messages.messages { + let headers = if message.headers.is_empty() { + None + } else { + let Ok(headers) = postcard::from_bytes(&message.headers) else { + error!("Failed to decode message headers"); + continue; + }; + Some(headers) + }; + + let Ok(payload) = messages_metadata.schema.try_into_payload(message.payload) else { + error!("Failed to decode message payload"); + continue; + }; + + messages.push(ConsumedMessage { + offset: message.offset, + headers, + payload, + }) + } + + let runtime = get_runtime(); + let result = + runtime.block_on(sink.consume(&topic_metadata, messages_metadata, messages)); + if result.is_ok() { 0 } else { 1 } + } + } +} + +#[macro_export] +macro_rules! sink_connector { + ($type:ty) => { + const _: fn() = || { + fn assert_trait<T: $crate::Sink>() {} + assert_trait::<$type>(); + }; + + use dashmap::DashMap; + use once_cell::sync::Lazy; + use $crate::sink::SinkContainer; + + static INSTANCES: Lazy<DashMap<u32, SinkContainer<$type>>> = Lazy::new(DashMap::new); + + #[unsafe(no_mangle)] + unsafe extern "C" fn open(id: u32, config_ptr: *const u8, config_len: usize) -> i32 { + let mut container = SinkContainer::new(id); + let result = container.open(id, config_ptr, config_len, <$type>::new); + INSTANCES.insert(id, container); + result + } + + #[unsafe(no_mangle)] + unsafe extern "C" fn consume( + id: u32, + topic_meta_ptr: *const u8, + topic_meta_len: usize, + messages_meta_ptr: *const u8, + messages_meta_len: usize, + messages_ptr: *const u8, + messages_len: usize, + ) -> i32 { + let Some(instance) = INSTANCES.get(&id) else { + eprintln!( + "Sink connector with ID: {id} was not found and consume cannot be invoked." + ); + return -1; + }; + instance.consume( + topic_meta_ptr, + topic_meta_len, + messages_meta_ptr, + messages_meta_len, + messages_ptr, + messages_len, + ) + } + + #[unsafe(no_mangle)] + unsafe extern "C" fn close(id: u32) -> i32 { + let Some(mut instance) = INSTANCES.remove(&id) else { + eprintln!("Sink connector with ID: {id} was not found and cannot be closed."); + return -1; + }; + instance.1.close() + } + }; +} diff --git a/core/connectors/sdk/src/source.rs b/core/connectors/sdk/src/source.rs new file mode 100644 index 00000000..ef47b7ee --- /dev/null +++ b/core/connectors/sdk/src/source.rs @@ -0,0 +1,218 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::{Error, Source, get_runtime}; +use serde::de::DeserializeOwned; +use std::sync::Arc; +use tokio::{sync::watch, task::JoinHandle}; +use tracing::{error, info}; +use tracing_subscriber::fmt; + +#[repr(C)] +pub struct RawMessage { + pub offset: u64, + pub headers_ptr: *const u8, + pub headers_len: usize, + pub payload_ptr: *const u8, + pub payload_len: usize, +} + +pub type HandleCallback = extern "C" fn(plugin_id: u32, callback: SendCallback) -> i32; + +pub type SendCallback = extern "C" fn(plugin_id: u32, messages_ptr: *const u8, messages_len: usize); + +#[derive(Debug)] +pub struct SourceContainer<T: Source + std::fmt::Debug> { + id: u32, + source: Option<Arc<T>>, + shutdown: Option<watch::Sender<()>>, + task: Option<JoinHandle<()>>, +} + +impl<T: Source + std::fmt::Debug + 'static> SourceContainer<T> { + pub const fn new(id: u32) -> Self { + Self { + id, + source: None, + shutdown: None, + task: None, + } + } + + /// # Safety + /// This is safe + pub unsafe fn open<F, C>( + &mut self, + id: u32, + config_ptr: *const u8, + config_len: usize, + factory: F, + ) -> i32 + where + F: FnOnce(u32, C) -> T, + C: DeserializeOwned, + { + unsafe { + _ = fmt::try_init(); + let slice = std::slice::from_raw_parts(config_ptr, config_len); + let Ok(config_str) = std::str::from_utf8(slice) else { + return -1; + }; + + let Ok(config) = serde_json::from_str(config_str) else { + return -1; + }; + + let mut source = factory(id, config); + let runtime = get_runtime(); + let result = runtime.block_on(source.open()); + self.id = id; + self.source = Some(Arc::new(source)); + if result.is_ok() { 0 } else { 1 } + } + } + + /// # Safety + /// This is safe + pub unsafe fn close(&mut self) -> i32 { + let Some(source) = self.source.take() else { + error!( + "Source with ID: {} is not initialized - cannot close.", + self.id + ); + return -1; + }; + + info!("Closing source with ID: {}...", self.id); + if let Some(sender) = self.shutdown.take() { + let _ = sender.send(()); + } + + let runtime = get_runtime(); + if let Some(handle) = self.task.take() { + let _ = runtime.block_on(handle); + } + + let Ok(mut source) = Arc::try_unwrap(source) else { + error!("Source with ID: {} was already closed.", self.id); + return -1; + }; + + runtime.block_on(source.close()); + info!("Closed source with ID: {}", self.id); + 0 + } + + /// # Safety + /// This is safe + pub unsafe fn handle(&mut self, callback: SendCallback) -> i32 { + let Some(source) = self.source.as_ref() else { + error!( + "Source with ID: {} is not initialized - cannot handle.", + self.id + ); + return -1; + }; + + let runtime = get_runtime(); + let (shutdown_tx, shutdown_rx) = watch::channel(()); + let plugin_id = self.id; + let source = Arc::clone(source); + let handle = runtime.spawn(async move { + let _ = handle_messages(plugin_id, source, callback, shutdown_rx).await; + }); + + self.shutdown = Some(shutdown_tx); + self.task = Some(handle); + 0 + } +} + +async fn handle_messages<T: Source>( + plugin_id: u32, + source: Arc<T>, + callback: SendCallback, + mut shutdown: watch::Receiver<()>, +) -> Result<(), Error> { + loop { + tokio::select! { + _ = shutdown.changed() => { + info!("Shutting down source container with ID: {plugin_id}"); + break; + } + messages = source.poll() => { + let Ok(messages) = messages else { + error!("Failed to poll messages for source container with ID: {plugin_id}"); + continue; + }; + + let Ok(messages) = postcard::to_allocvec(&messages) else { + error!("Failed to serialize messages for source container with ID: {plugin_id}"); + continue; + }; + + callback(plugin_id, messages.as_ptr(), messages.len()); + } + } + } + + Ok(()) +} + +#[macro_export] +macro_rules! source_connector { + ($type:ty) => { + const _: fn() = || { + fn assert_trait<T: $crate::Source>() {} + assert_trait::<$type>(); + }; + + use dashmap::DashMap; + use once_cell::sync::Lazy; + use $crate::source::SendCallback; + use $crate::source::SourceContainer; + + static INSTANCES: Lazy<DashMap<u32, SourceContainer<$type>>> = Lazy::new(DashMap::new); + + #[unsafe(no_mangle)] + unsafe extern "C" fn open(id: u32, config_ptr: *const u8, config_len: usize) -> i32 { + let mut container = SourceContainer::new(id); + let result = container.open(id, config_ptr, config_len, <$type>::new); + INSTANCES.insert(id, container); + result + } + + #[unsafe(no_mangle)] + unsafe extern "C" fn handle(id: u32, callback: SendCallback) -> i32 { + let Some(mut instance) = INSTANCES.get_mut(&id) else { + eprintln!("Source connector with ID: {id} was not found and cannot be handled."); + return -1; + }; + instance.handle(callback) + } + + #[unsafe(no_mangle)] + unsafe extern "C" fn close(id: u32) -> i32 { + let Some(mut instance) = INSTANCES.remove(&id) else { + eprintln!("Source connector with ID: {id} was not found and cannot be closed."); + return -1; + }; + instance.1.close() + } + }; +} diff --git a/core/connectors/sdk/src/transforms/add_fields.rs b/core/connectors/sdk/src/transforms/add_fields.rs new file mode 100644 index 00000000..569a16b8 --- /dev/null +++ b/core/connectors/sdk/src/transforms/add_fields.rs @@ -0,0 +1,128 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use serde::{Deserialize, Serialize}; +use simd_json::OwnedValue; +use strum_macros::{Display, IntoStaticStr}; + +use crate::{DecodedMessage, Error, Payload, TopicMetadata}; + +use super::{Transform, TransformType}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct AddFieldsConfig { + fields: Vec<Field>, +} + +pub struct AddFields { + fields: Vec<Field>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct Field { + key: String, + value: FieldValue, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum FieldValue { + Static(simd_json::OwnedValue), + Computed(ComputedValue), +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Display, IntoStaticStr)] +#[serde(rename_all = "snake_case")] +enum ComputedValue { + #[strum(to_string = "date_time")] + DateTime, + #[strum(to_string = "timestamp_nanos")] + TimestampNanos, + #[strum(to_string = "timestamp_micros")] + TimestampMicros, + #[strum(to_string = "timestamp_millis")] + TimestampMillis, + #[strum(to_string = "timestamp_seconds")] + TimestampSeconds, + #[strum(to_string = "uuid_v4")] + UuidV4, + #[strum(to_string = "uuid_v7")] + UuidV7, +} + +impl AddFields { + pub fn new(config: AddFieldsConfig) -> Self { + Self { + fields: config.fields, + } + } +} + +impl Transform for AddFields { + fn r#type(&self) -> TransformType { + TransformType::AddFields + } + + fn transform( + &self, + _metadata: &TopicMetadata, + mut message: DecodedMessage, + ) -> Result<Option<DecodedMessage>, Error> { + if self.fields.is_empty() { + return Ok(Some(message)); + } + + let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload else { + return Ok(Some(message)); + }; + + for field in &self.fields { + match &field.value { + FieldValue::Static(value) => map.insert(field.key.clone(), value.clone()), + FieldValue::Computed(value) => match value { + ComputedValue::DateTime => { + map.insert(field.key.clone(), chrono::Utc::now().to_rfc3339().into()) + } + ComputedValue::TimestampMillis => map.insert( + field.key.clone(), + chrono::Utc::now().timestamp_millis().into(), + ), + ComputedValue::TimestampMicros => map.insert( + field.key.clone(), + chrono::Utc::now().timestamp_micros().into(), + ), + ComputedValue::TimestampNanos => map.insert( + field.key.clone(), + chrono::Utc::now().timestamp_nanos_opt().into(), + ), + ComputedValue::TimestampSeconds => { + map.insert(field.key.clone(), chrono::Utc::now().timestamp().into()) + } + ComputedValue::UuidV4 => { + map.insert(field.key.clone(), uuid::Uuid::new_v4().to_string().into()) + } + ComputedValue::UuidV7 => { + map.insert(field.key.clone(), uuid::Uuid::now_v7().to_string().into()) + } + }, + }; + } + + Ok(Some(message)) + } +} diff --git a/core/connectors/sdk/src/transforms/delete_fields.rs b/core/connectors/sdk/src/transforms/delete_fields.rs new file mode 100644 index 00000000..d7dd755a --- /dev/null +++ b/core/connectors/sdk/src/transforms/delete_fields.rs @@ -0,0 +1,61 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::collections::HashSet; + +use serde::{Deserialize, Serialize}; +use simd_json::OwnedValue; + +use crate::{DecodedMessage, Error, Payload, TopicMetadata}; + +use super::{Transform, TransformType}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeleteFieldsConfig { + fields: Vec<String>, +} + +pub struct DeleteFields { + fields: HashSet<String>, +} + +impl DeleteFields { + pub fn new(config: DeleteFieldsConfig) -> Self { + Self { + fields: config.fields.into_iter().collect(), + } + } +} + +impl Transform for DeleteFields { + fn r#type(&self) -> TransformType { + TransformType::DeleteFields + } + + fn transform( + &self, + _metadata: &TopicMetadata, + mut message: DecodedMessage, + ) -> Result<Option<DecodedMessage>, Error> { + if let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload { + map.retain(|key, _| !self.fields.contains(key)); + } + + Ok(Some(message)) + } +} diff --git a/core/connectors/sdk/src/transforms/mod.rs b/core/connectors/sdk/src/transforms/mod.rs new file mode 100644 index 00000000..35cf3382 --- /dev/null +++ b/core/connectors/sdk/src/transforms/mod.rs @@ -0,0 +1,45 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use serde::{Deserialize, Serialize}; +use strum_macros::{Display, IntoStaticStr}; + +use crate::{DecodedMessage, Error, TopicMetadata}; + +pub mod add_fields; +pub mod delete_fields; + +pub trait Transform: Send + Sync { + fn r#type(&self) -> TransformType; + fn transform( + &self, + metadata: &TopicMetadata, + message: DecodedMessage, + ) -> Result<Option<DecodedMessage>, Error>; +} + +#[derive( + Debug, Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, Display, IntoStaticStr, +)] +#[serde(rename_all = "snake_case")] +pub enum TransformType { + #[strum(to_string = "add_fields")] + AddFields, + #[strum(to_string = "delete_fields")] + DeleteFields, +} diff --git a/core/connectors/sinks/quickwit_sink/Cargo.toml b/core/connectors/sinks/quickwit_sink/Cargo.toml new file mode 100644 index 00000000..cf6a2170 --- /dev/null +++ b/core/connectors/sinks/quickwit_sink/Cargo.toml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_quickwit_sink" +version = "0.1.0" +description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +async-trait = { workspace = true } +dashmap = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } +serde_yml = { workspace = true } +simd-json = { workspace = true } +tracing = { workspace = true } + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] diff --git a/core/connectors/sinks/quickwit_sink/src/lib.rs b/core/connectors/sinks/quickwit_sink/src/lib.rs new file mode 100644 index 00000000..cf0962cd --- /dev/null +++ b/core/connectors/sinks/quickwit_sink/src/lib.rs @@ -0,0 +1,209 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use async_trait::async_trait; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, sink_connector, +}; +use serde::{Deserialize, Serialize}; +use tracing::{error, info, warn}; + +sink_connector!(QuickwitSink); + +#[derive(Debug)] +pub struct QuickwitSink { + id: u32, + config: QuickwitSinkConfig, + client: reqwest::Client, + index_id: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct QuickwitSinkConfig { + url: String, + index: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct IndexConfig { + index_id: String, +} + +impl QuickwitSink { + pub fn new(id: u32, config: QuickwitSinkConfig) -> Self { + let index_config = + serde_yml::from_str::<IndexConfig>(&config.index).expect("Invalid index config."); + QuickwitSink { + id, + config, + index_id: index_config.index_id, + client: reqwest::Client::new(), + } + } + + async fn has_index(&self) -> Result<bool, Error> { + let url = format!("{}/api/v1/indexes/{}", self.config.url, self.index_id); + let response = self.client.get(&url).send().await.map_err(|error| { + error!( + "Failed to send HTTP request to check if index with ID: {} exists. {error}", + self.index_id + ); + Error::HttpRequestFailed(error.to_string()) + })?; + let status = response.status(); + if status.is_success() { + Ok(true) + } else if status == reqwest::StatusCode::NOT_FOUND { + Ok(false) + } else { + Err(Error::HttpRequestFailed(format!( + "Unexpected status code: {status}", + ))) + } + } + + async fn create_index(&self) -> Result<(), Error> { + info!("Creating index: {}", self.index_id); + let url = format!("{}/api/v1/indexes", self.config.url); + let response = self + .client + .post(&url) + .header("content-type", "application/yaml") + .body(self.config.index.to_owned()) + .send() + .await + .map_err(|error| { + error!( + "Failed to send HTTP request to create index: {}. {error}", + self.index_id + ); + Error::HttpRequestFailed(error.to_string()) + })?; + + if !response.status().is_success() { + let status = response.status(); + let reason = response.text().await.unwrap_or_default(); + error!( + "Received an invalid HTTP response when creating index: {}. Status code: {status}, reason: {reason}", + self.index_id + ); + return Err(Error::InitError(format!( + "Failed to create index: {}. {reason}", + self.index_id + ))); + } + + info!("Created index: {}", self.index_id); + Ok(()) + } + + pub async fn ingest(&self, messages: Vec<simd_json::OwnedValue>) -> Result<(), Error> { + let url = format!( + "{}/api/v1/{}/ingest?commit=auto", + self.config.url, self.index_id + ); + info!("Ingesting messages for index: {}...", self.index_id); + let messages_count = messages.len(); + let messages = messages + .into_iter() + .filter_map(|record| simd_json::to_string(&record).ok()) + .collect::<Vec<_>>() + .join("\n"); + + let response = self + .client + .post(&url) + .body(messages) + .send() + .await + .map_err(|error| { + error!( + "Failed to send HTTP request to ingest messages for index: {}. {error}", + self.index_id + ); + Error::HttpRequestFailed(error.to_string()) + })?; + + if !response.status().is_success() { + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + error!( + "Received an invalid HTTP response when ingesting messages for index: {}. Status code: {status}, reason: {text}", + self.index_id + ); + return Err(Error::HttpRequestFailed(format!( + "Status code: {status}, reason: {text}" + ))); + } + + info!( + "Ingested {messages_count} messages for index: {}", + self.index_id + ); + Ok(()) + } +} + +#[async_trait] +impl Sink for QuickwitSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Initialized QuickwitSink with ID: {} for URL: {}", + self.id, self.config.url + ); + if !self.has_index().await? { + self.create_index().await?; + } + Ok(()) + } + + async fn consume( + &self, + _topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + info!( + "Quickwit sink with ID: {} received: {} messages, format: {}", + self.id, + messages.len(), + messages_metadata.schema + ); + + let mut json_payloads = Vec::with_capacity(messages.len()); + for message in messages { + match message.payload { + Payload::Json(value) => json_payloads.push(value), + _ => { + warn!("Unsupported payload format: {}", messages_metadata.schema); + } + } + } + + if json_payloads.is_empty() { + return Ok(()); + } + + self.ingest(json_payloads).await?; + Ok(()) + } + + async fn close(&mut self) { + info!("Quickwit sink with ID: {} is shutting down", self.id); + } +} diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml b/core/connectors/sinks/stdout_sink/Cargo.toml new file mode 100644 index 00000000..1251732a --- /dev/null +++ b/core/connectors/sinks/stdout_sink/Cargo.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_stdout_sink" +version = "0.1.0" +description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +async-trait = { workspace = true } +dashmap = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +serde = { workspace = true } +tracing = { workspace = true } + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] diff --git a/core/connectors/sinks/stdout_sink/src/lib.rs b/core/connectors/sinks/stdout_sink/src/lib.rs new file mode 100644 index 00000000..afa9df59 --- /dev/null +++ b/core/connectors/sinks/stdout_sink/src/lib.rs @@ -0,0 +1,88 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use async_trait::async_trait; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Sink, TopicMetadata, sink_connector, +}; +use serde::{Deserialize, Serialize}; +use tracing::info; + +sink_connector!(StdoutSink); + +#[derive(Debug)] +pub struct StdoutSink { + id: u32, + print_payload: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct StdoutSinkConfig { + print_payload: Option<bool>, +} + +impl StdoutSink { + pub fn new(id: u32, config: StdoutSinkConfig) -> Self { + StdoutSink { + id, + print_payload: config.print_payload.unwrap_or(false), + } + } +} + +#[async_trait] +impl Sink for StdoutSink { + async fn open(&mut self) -> Result<(), Error> { + info!( + "Initialized stdout sink with ID: {}. Print payload: {}", + self.id, self.print_payload + ); + Ok(()) + } + + async fn consume( + &self, + topic_metadata: &TopicMetadata, + messages_metadata: MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + info!( + "Stdout sink with ID: {} received: {} messages, schema: {}, stream: {}, topic: {}, partition: {}, offset: {}", + self.id, + messages.len(), + messages_metadata.schema, + topic_metadata.stream, + topic_metadata.stream, + messages_metadata.partition_id, + messages_metadata.current_offset + ); + if self.print_payload { + for message in messages { + info!( + "Message offset: {}, payload: {:#?}", + message.offset, message.payload + ); + } + } + Ok(()) + } + + async fn close(&mut self) { + info!("Stdout sink with ID: {} is shutting down", self.id); + } +} diff --git a/core/connectors/sources/test_source/Cargo.toml b/core/connectors/sources/test_source/Cargo.toml new file mode 100644 index 00000000..f1faec68 --- /dev/null +++ b/core/connectors/sources/test_source/Cargo.toml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_test_source" +version = "0.1.0" +description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +async-trait = { workspace = true } +dashmap = { workspace = true } +humantime = { workspace = true } +iggy_connector_sdk = { workspace = true } +once_cell = { workspace = true } +rand = { workspace = true } +serde = { workspace = true } +simd-json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[package.metadata.cargo-machete] +ignored = ["dashmap", "once_cell"] diff --git a/core/connectors/sources/test_source/src/lib.rs b/core/connectors/sources/test_source/src/lib.rs new file mode 100644 index 00000000..9d8c4cf1 --- /dev/null +++ b/core/connectors/sources/test_source/src/lib.rs @@ -0,0 +1,157 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::{str::FromStr, time::Duration}; + +use async_trait::async_trait; +use iggy_connector_sdk::{ProducedMessage, ProducedMessages, Schema, Source, source_connector}; +use rand::{ + Rng, + distr::{Alphanumeric, Uniform}, +}; +use serde::{Deserialize, Serialize}; +use tokio::{sync::Mutex, time::sleep}; +use tracing::info; + +source_connector!(TestSource); + +#[derive(Debug)] +pub struct TestSource { + id: u32, + max_count: Option<usize>, + interval: Duration, + messages_range: (u32, u32), + payload_size: u32, + state: Mutex<State>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TestSourceConfig { + interval: Option<String>, + max_count: Option<usize>, + messages_range: Option<(u32, u32)>, + payload_size: Option<u32>, +} + +#[derive(Debug)] +struct State { + current_number: usize, +} + +impl TestSource { + pub fn new(id: u32, config: TestSourceConfig) -> Self { + let interval = config.interval.unwrap_or("1s".to_string()); + let interval = humantime::Duration::from_str(&interval) + .unwrap_or(humantime::Duration::from_str("1s").unwrap()); + TestSource { + id, + max_count: config.max_count, + interval: *interval, + messages_range: config.messages_range.unwrap_or((10, 50)), + payload_size: config.payload_size.unwrap_or(100), + state: Mutex::new(State { current_number: 0 }), + } + } + + fn generate_messages(&self) -> Vec<ProducedMessage> { + let mut messages = Vec::new(); + let mut rng = rand::rng(); + let messages_count = + rng.sample(Uniform::new(self.messages_range.0, self.messages_range.1).unwrap()); + for _ in 0..messages_count { + let record = Record { + title: "Hello".to_string(), + name: "World".to_string(), + age: 30, + text: self.generate_random_text(), + }; + let message = ProducedMessage { + id: None, + headers: None, + payload: simd_json::to_vec(&record).unwrap(), + }; + messages.push(message); + } + messages + } + + fn generate_random_text(&self) -> String { + let mut rng = rand::rng(); + let text: String = (0..self.payload_size) + .map(|_| rng.sample(Alphanumeric) as char) + .collect(); + text + } +} + +#[async_trait] +impl Source for TestSource { + async fn open(&mut self) -> Result<(), iggy_connector_sdk::Error> { + info!( + "Initialized test source with ID {}. Interval: {:#?}, max offset: {:#?}, messages range: {} - {}, payload size: {}", + self.id, + self.interval, + self.max_count, + self.messages_range.0, + self.messages_range.1, + self.payload_size + ); + Ok(()) + } + + async fn poll(&self) -> Result<ProducedMessages, iggy_connector_sdk::Error> { + sleep(self.interval).await; + let mut state = self.state.lock().await; + if let Some(max_count) = self.max_count { + if state.current_number >= max_count { + info!( + "Reached max number of {max_count} messages for test source with ID {}", + self.id + ); + return Ok(ProducedMessages { + schema: Schema::Json, + messages: vec![], + }); + } + } + + let messages = self.generate_messages(); + state.current_number += messages.len(); + info!( + "Generated {} messages by test source with ID {}", + messages.len(), + self.id + ); + Ok(ProducedMessages { + schema: Schema::Json, + messages, + }) + } + + async fn close(&mut self) { + info!("Test source with ID {} is shutting down", self.id); + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct Record { + title: String, + name: String, + age: u32, + text: String, +} diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index e507ed47..0b4ee673 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -128,3 +128,6 @@ serial_test = { workspace = true } [[bin]] name = "iggy-server" path = "src/main.rs" + +[package.metadata.cargo-machete] +ignored = ["rust-s3"]
