This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch non_string_header_key in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 338b5a641bf807f60c27969c020f2ff08f7f017b Author: spetz <[email protected]> AuthorDate: Thu Jan 29 15:33:30 2026 +0100 feat(server,sdk): allow arbitrary (non-string) header key kind --- Cargo.lock | 36 +- Cargo.toml | 8 +- DEPENDENCIES.md | 36 +- core/ai/mcp/Cargo.toml | 2 +- core/bench/Cargo.toml | 2 +- core/binary_protocol/Cargo.toml | 2 +- .../src/cli/binary_message/poll_messages.rs | 4 +- core/cli/Cargo.toml | 2 +- core/common/Cargo.toml | 2 +- core/common/src/types/message/iggy_message.rs | 34 +- core/common/src/types/message/mod.rs | 5 +- core/common/src/types/message/user_headers.rs | 413 +++++++++++---------- core/connectors/runtime/Cargo.toml | 2 +- core/connectors/sdk/Cargo.toml | 2 +- .../connectors/sinks/elasticsearch_sink/Cargo.toml | 2 +- core/connectors/sinks/iceberg_sink/Cargo.toml | 2 +- core/connectors/sinks/postgres_sink/Cargo.toml | 2 +- core/connectors/sinks/quickwit_sink/Cargo.toml | 2 +- core/connectors/sinks/stdout_sink/Cargo.toml | 2 +- .../sources/elasticsearch_source/Cargo.toml | 2 +- core/connectors/sources/postgres_source/Cargo.toml | 2 +- core/connectors/sources/random_source/Cargo.toml | 2 +- .../tests/cli/message/test_message_poll_command.rs | 2 +- .../tests/cli/message/test_message_send_command.rs | 6 +- .../server/scenarios/create_message_payload.rs | 12 +- .../tests/server/scenarios/encryption_scenario.rs | 30 +- .../server/scenarios/message_headers_scenario.rs | 12 +- .../server/scenarios/message_size_scenario.rs | 2 +- .../tests/server/scenarios/offset_scenario.rs | 6 +- .../tests/server/scenarios/timestamp_scenario.rs | 6 +- core/sdk/Cargo.toml | 2 +- core/server/Cargo.toml | 2 +- core/tools/src/data-seeder/seeder.rs | 13 +- examples/rust/Cargo.toml | 10 +- examples/rust/README.md | 9 +- .../message-headers/message-type/consumer/main.rs | 2 +- .../message-headers/message-type/producer/main.rs | 2 +- .../consumer/main.rs | 44 +-- .../producer/main.rs | 56 ++- examples/rust/src/shared/codec.rs | 2 +- foreign/python/Cargo.toml | 4 +- .../lib/components/Modals/InspectMessage.svelte | 132 ++++++- .../RouteComponents/Settings/UsersTab.svelte | 8 +- web/src/lib/domain/Message.ts | 7 +- web/src/lib/domain/User.ts | 2 +- .../routes/dashboard/settings/users/+page.svelte | 10 +- 46 files changed, 555 insertions(+), 392 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fc1ce8c7f..24c07e7db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4429,7 +4429,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.8.1-edge.7" +version = "0.8.2-edge.1" dependencies = [ "async-broadcast", "async-dropper", @@ -4461,7 +4461,7 @@ dependencies = [ [[package]] name = "iggy-bench" -version = "0.3.1-edge.2" +version = "0.3.2-edge.1" dependencies = [ "async-trait", "bench-report", @@ -4516,7 +4516,7 @@ dependencies = [ [[package]] name = "iggy-cli" -version = "0.10.1-edge.1" +version = "0.10.2-edge.1" dependencies = [ "ahash 0.8.12", "anyhow", @@ -4536,7 +4536,7 @@ dependencies = [ [[package]] name = "iggy-connectors" -version = "0.2.1-edge.6" +version = "0.2.2-edge.1" dependencies = [ "async-trait", "axum", @@ -4588,7 +4588,7 @@ dependencies = [ [[package]] name = "iggy-mcp" -version = "0.2.1-edge.5" +version = "0.2.2-edge.1" dependencies = [ "axum", "axum-server", @@ -4620,7 +4620,7 @@ dependencies = [ [[package]] name = "iggy_binary_protocol" -version = "0.8.1-edge.3" +version = "0.8.2-edge.1" dependencies = [ "anyhow", "async-broadcast", @@ -4641,7 +4641,7 @@ dependencies = [ [[package]] name = "iggy_common" -version = "0.8.1-edge.2" +version = "0.8.2-edge.1" dependencies = [ "aes-gcm", "ahash 0.8.12", @@ -4684,7 +4684,7 @@ dependencies = [ [[package]] name = "iggy_connector_elasticsearch_sink" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" dependencies = [ "async-trait", "base64 0.22.1", @@ -4702,7 +4702,7 @@ dependencies = [ [[package]] name = "iggy_connector_elasticsearch_source" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" dependencies = [ "async-trait", "chrono", @@ -4720,7 +4720,7 @@ dependencies = [ [[package]] name = "iggy_connector_iceberg_sink" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" dependencies = [ "arrow-json", "async-trait", @@ -4739,7 +4739,7 @@ dependencies = [ [[package]] name = "iggy_connector_postgres_sink" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" dependencies = [ "async-trait", "chrono", @@ -4758,7 +4758,7 @@ dependencies = [ [[package]] name = "iggy_connector_postgres_source" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" dependencies = [ "async-trait", "base64 0.22.1", @@ -4779,7 +4779,7 @@ dependencies = [ [[package]] name = "iggy_connector_quickwit_sink" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" dependencies = [ "async-trait", "dashmap", @@ -4794,7 +4794,7 @@ dependencies = [ [[package]] name = "iggy_connector_random_source" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" dependencies = [ "async-trait", "dashmap", @@ -4811,7 +4811,7 @@ dependencies = [ [[package]] name = "iggy_connector_sdk" -version = "0.1.1-edge.3" +version = "0.1.2-edge.1" dependencies = [ "async-trait", "base64 0.22.1", @@ -4839,7 +4839,7 @@ dependencies = [ [[package]] name = "iggy_connector_stdout_sink" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" dependencies = [ "async-trait", "dashmap", @@ -4852,7 +4852,7 @@ dependencies = [ [[package]] name = "iggy_examples" -version = "0.0.5" +version = "0.0.6" dependencies = [ "ahash 0.8.12", "anyhow", @@ -8247,7 +8247,7 @@ dependencies = [ [[package]] name = "server" -version = "0.6.1-edge.6" +version = "0.6.2-edge.1" dependencies = [ "ahash 0.8.12", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index e6797945c..9ba1cf7bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,10 +140,10 @@ humantime = "2.3.0" hwlocality = "1.0.0-alpha.11" iceberg = "0.8.0" iceberg-catalog-rest = "0.8.0" -iggy = { path = "core/sdk", version = "0.8.1-edge.6" } -iggy_binary_protocol = { path = "core/binary_protocol", version = "0.8.1-edge.3" } -iggy_common = { path = "core/common", version = "0.8.1-edge.2" } -iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.1-edge.3" } +iggy = { path = "core/sdk", version = "0.8.2-edge.1" } +iggy_binary_protocol = { path = "core/binary_protocol", version = "0.8.2-edge.1" } +iggy_common = { path = "core/common", version = "0.8.2-edge.1" } +iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.2-edge.1" } integration = { path = "core/integration" } keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] } lazy_static = "1.5.0" diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index b5861ebea..306f3bdb7 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -385,24 +385,24 @@ icu_provider: 2.1.1, "Unicode-3.0", ident_case: 1.0.1, "Apache-2.0 OR MIT", idna: 1.1.0, "Apache-2.0 OR MIT", idna_adapter: 1.2.1, "Apache-2.0 OR MIT", -iggy: 0.8.1-edge.7, "Apache-2.0", -iggy-bench: 0.3.1-edge.2, "Apache-2.0", +iggy: 0.8.2-edge.1, "Apache-2.0", +iggy-bench: 0.3.2-edge.1, "Apache-2.0", iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0", -iggy-cli: 0.10.1-edge.1, "Apache-2.0", -iggy-connectors: 0.2.1-edge.6, "Apache-2.0", -iggy-mcp: 0.2.1-edge.5, "Apache-2.0", -iggy_binary_protocol: 0.8.1-edge.3, "Apache-2.0", -iggy_common: 0.8.1-edge.2, "Apache-2.0", -iggy_connector_elasticsearch_sink: 0.2.0-edge.1, "Apache-2.0", -iggy_connector_elasticsearch_source: 0.2.0-edge.1, "Apache-2.0", -iggy_connector_iceberg_sink: 0.2.0-edge.1, "Apache-2.0", -iggy_connector_postgres_sink: 0.2.0-edge.1, "Apache-2.0", -iggy_connector_postgres_source: 0.2.0-edge.1, "Apache-2.0", -iggy_connector_quickwit_sink: 0.2.0-edge.1, "Apache-2.0", -iggy_connector_random_source: 0.2.0-edge.1, "Apache-2.0", -iggy_connector_sdk: 0.1.1-edge.3, "Apache-2.0", -iggy_connector_stdout_sink: 0.2.0-edge.1, "Apache-2.0", -iggy_examples: 0.0.5, "Apache-2.0", +iggy-cli: 0.10.2-edge.1, "Apache-2.0", +iggy-connectors: 0.2.2-edge.1, "Apache-2.0", +iggy-mcp: 0.2.2-edge.1, "Apache-2.0", +iggy_binary_protocol: 0.8.2-edge.1, "Apache-2.0", +iggy_common: 0.8.2-edge.1, "Apache-2.0", +iggy_connector_elasticsearch_sink: 0.2.1-edge.1, "Apache-2.0", +iggy_connector_elasticsearch_source: 0.2.1-edge.1, "Apache-2.0", +iggy_connector_iceberg_sink: 0.2.1-edge.1, "Apache-2.0", +iggy_connector_postgres_sink: 0.2.1-edge.1, "Apache-2.0", +iggy_connector_postgres_source: 0.2.1-edge.1, "Apache-2.0", +iggy_connector_quickwit_sink: 0.2.1-edge.1, "Apache-2.0", +iggy_connector_random_source: 0.2.1-edge.1, "Apache-2.0", +iggy_connector_sdk: 0.1.2-edge.1, "Apache-2.0", +iggy_connector_stdout_sink: 0.2.1-edge.1, "Apache-2.0", +iggy_examples: 0.0.6, "Apache-2.0", ignore: 0.4.25, "MIT OR Unlicense", impl-more: 0.1.9, "Apache-2.0 OR MIT", implicit-clone: 0.6.0, "Apache-2.0 OR MIT", @@ -718,7 +718,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT", serde_yaml_ng: 0.10.0, "MIT", serial_test: 3.3.1, "MIT", serial_test_derive: 3.3.1, "MIT", -server: 0.6.1-edge.6, "Apache-2.0", +server: 0.6.2-edge.1, "Apache-2.0", sha1: 0.10.6, "Apache-2.0 OR MIT", sha2: 0.10.9, "Apache-2.0 OR MIT", sha3: 0.10.8, "Apache-2.0 OR MIT", diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml index 8cb193aa7..d7b781f5e 100644 --- a/core/ai/mcp/Cargo.toml +++ b/core/ai/mcp/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy-mcp" -version = "0.2.1-edge.5" +version = "0.2.2-edge.1" description = "MCP Server for Iggy message streaming platform" edition = "2024" license = "Apache-2.0" diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml index bbfbae035..ea475e412 100644 --- a/core/bench/Cargo.toml +++ b/core/bench/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy-bench" -version = "0.3.1-edge.2" +version = "0.3.2-edge.1" edition = "2024" license = "Apache-2.0" repository = "https://github.com/apache/iggy" diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml index c015b5257..32e237bc4 100644 --- a/core/binary_protocol/Cargo.toml +++ b/core/binary_protocol/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_binary_protocol" -version = "0.8.1-edge.3" +version = "0.8.2-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/binary_protocol/src/cli/binary_message/poll_messages.rs b/core/binary_protocol/src/cli/binary_message/poll_messages.rs index d4867a4a3..dd210ddb8 100644 --- a/core/binary_protocol/src/cli/binary_message/poll_messages.rs +++ b/core/binary_protocol/src/cli/binary_message/poll_messages.rs @@ -113,7 +113,7 @@ impl PollMessagesCmd { let message_headers = header_key_set .iter() .map(|(key, kind)| { - Cell::new(format!("Header: {}\n{}", key.as_str(), kind)) + Cell::new(format!("Header: {}\n{}", key.to_string_value(), kind)) .set_alignment(CellAlignment::Center) }) .collect::<Vec<_>>(); @@ -147,7 +147,7 @@ impl PollMessagesCmd { .map(|h| { h.get(key) .filter(|v| v.kind == *kind) - .map(|v| v.value_only_to_string()) + .map(|v| v.to_string_value()) .unwrap_or_default() }) .unwrap_or_default() diff --git a/core/cli/Cargo.toml b/core/cli/Cargo.toml index 98fd9b7f1..a7f2b5382 100644 --- a/core/cli/Cargo.toml +++ b/core/cli/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy-cli" -version = "0.10.1-edge.1" +version = "0.10.2-edge.1" edition = "2024" authors = ["[email protected]"] repository = "https://github.com/apache/iggy" diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index fbcb02cd7..07a82abc7 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -16,7 +16,7 @@ # under the License. [package] name = "iggy_common" -version = "0.8.1-edge.2" +version = "0.8.2-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/common/src/types/message/iggy_message.rs b/core/common/src/types/message/iggy_message.rs index 5998c3975..5bc464fc1 100644 --- a/core/common/src/types/message/iggy_message.rs +++ b/core/common/src/types/message/iggy_message.rs @@ -541,6 +541,7 @@ impl Serialize for IggyMessage { where S: Serializer, { + use super::user_headers::HeaderEntry; use base64::{Engine as _, engine::general_purpose::STANDARD}; use serde::ser::SerializeStruct; @@ -554,8 +555,13 @@ impl Serialize for IggyMessage { if self.user_headers.is_some() { let headers_map = self.user_headers_map().map_err(serde::ser::Error::custom)?; - - state.serialize_field("user_headers", &headers_map)?; + if let Some(map) = headers_map { + let entries: Vec<HeaderEntry> = map + .into_iter() + .map(|(key, value)| HeaderEntry { key, value }) + .collect(); + state.serialize_field("user_headers", &entries)?; + } } state.end() @@ -567,6 +573,7 @@ impl<'de> Deserialize<'de> for IggyMessage { where D: Deserializer<'de>, { + use super::user_headers::HeaderEntry; use serde::de::{self, MapAccess, Visitor}; use std::fmt; @@ -601,7 +608,12 @@ impl<'de> Deserialize<'de> for IggyMessage { payload = Some(Bytes::from(decoded)); } "user_headers" => { - user_headers = Some(map.next_value()?); + let entries: Vec<HeaderEntry> = map.next_value()?; + let mut headers_map = HashMap::new(); + for entry in entries { + headers_map.insert(entry.key, entry.value); + } + user_headers = Some(headers_map); } _ => { let _ = map.next_value::<de::IgnoredAny>()?; @@ -668,7 +680,7 @@ mod tests { fn test_create_with_headers() { let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("content-type").unwrap(), + HeaderKey::from_string("content-type").unwrap(), HeaderValue::from_str("text/plain").unwrap(), ); @@ -682,7 +694,7 @@ mod tests { let headers_map = message.user_headers_map().unwrap().unwrap(); assert_eq!(headers_map.len(), 1); - assert!(headers_map.contains_key(&HeaderKey::new("content-type").unwrap())); + assert!(headers_map.contains_key(&HeaderKey::from_string("content-type").unwrap())); } #[test] @@ -724,11 +736,11 @@ mod tests { fn test_json_serialization_with_headers() { let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("content-type").unwrap(), + HeaderKey::from_string("content-type").unwrap(), HeaderValue::from_str("text/plain").unwrap(), ); headers.insert( - HeaderKey::new("correlation-id").unwrap(), + HeaderKey::from_string("correlation-id").unwrap(), HeaderValue::from_str("123456").unwrap(), ); @@ -773,12 +785,12 @@ mod tests { assert_eq!(original_map.len(), deserialized_map.len()); assert_eq!( - original_map.get(&HeaderKey::new("content-type").unwrap()), - deserialized_map.get(&HeaderKey::new("content-type").unwrap()) + original_map.get(&HeaderKey::from_string("content-type").unwrap()), + deserialized_map.get(&HeaderKey::from_string("content-type").unwrap()) ); assert_eq!( - original_map.get(&HeaderKey::new("correlation-id").unwrap()), - deserialized_map.get(&HeaderKey::new("correlation-id").unwrap()) + original_map.get(&HeaderKey::from_string("correlation-id").unwrap()), + deserialized_map.get(&HeaderKey::from_string("correlation-id").unwrap()) ); } } diff --git a/core/common/src/types/message/mod.rs b/core/common/src/types/message/mod.rs index 5ab0dbf33..caa29640b 100644 --- a/core/common/src/types/message/mod.rs +++ b/core/common/src/types/message/mod.rs @@ -55,4 +55,7 @@ pub use partitioning_kind::PartitioningKind; pub use polled_messages::PolledMessages; pub use polling_kind::PollingKind; pub use polling_strategy::PollingStrategy; -pub use user_headers::{HeaderKey, HeaderKind, HeaderValue}; +pub use user_headers::{ + HeaderEntry, HeaderField, HeaderKey, HeaderKind, HeaderValue, KeyMarker, UserHeaders, + ValueMarker, deserialize_headers, serialize_headers, +}; diff --git a/core/common/src/types/message/user_headers.rs b/core/common/src/types/message/user_headers.rs index e5cfa37bf..3912fd88f 100644 --- a/core/common/src/types/message/user_headers.rs +++ b/core/common/src/types/message/user_headers.rs @@ -25,67 +25,29 @@ use serde_with::serde_as; use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; +use std::marker::PhantomData; use std::str::FromStr; -/// Represents a header key with a unique name. The name is case-insensitive and wraps a string. -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] -pub struct HeaderKey(String); - -impl HeaderKey { - pub fn new(key: &str) -> Result<Self, IggyError> { - if key.is_empty() || key.len() > 255 { - return Err(IggyError::InvalidHeaderKey); - } - - Ok(Self(key.to_lowercase().to_string())) - } +pub type HeaderKey = HeaderField<KeyMarker>; +pub type HeaderValue = HeaderField<ValueMarker>; +pub type UserHeaders = HashMap<HeaderKey, HeaderValue>; - pub fn as_str(&self) -> &str { - &self.0 - } -} - -impl Display for HeaderKey { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{}", self.as_str()) - } -} +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct KeyMarker; -impl Hash for HeaderKey { - fn hash<H: Hasher>(&self, state: &mut H) { - self.0.hash(state); - } -} +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ValueMarker; -impl FromStr for HeaderKey { - type Err = IggyError; - fn from_str(s: &str) -> Result<Self, Self::Err> { - Self::new(s) - } -} - -impl TryFrom<&str> for HeaderKey { - type Error = IggyError; - fn try_from(value: &str) -> Result<Self, Self::Error> { - Self::new(value) - } -} - -/// Represents a header value of a specific kind. -/// It consists of the following fields: -/// - `kind`: the kind of the header value. -/// - `value`: the value of the header. #[serde_as] #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] -pub struct HeaderValue { - /// The kind of the header value. +pub struct HeaderField<T> { pub kind: HeaderKind, - /// The binary value of the header payload. #[serde_as(as = "Base64")] pub value: Bytes, + #[serde(skip)] + _marker: PhantomData<T>, } -/// Represents the kind of a header value. #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash)] #[serde(rename_all = "snake_case")] pub enum HeaderKind { @@ -107,7 +69,6 @@ pub enum HeaderKind { } impl HeaderKind { - /// Returns the code of the header kind. pub fn as_code(&self) -> u8 { match self { HeaderKind::Raw => 1, @@ -128,7 +89,6 @@ impl HeaderKind { } } - /// Returns the header kind from the code. pub fn from_code(code: u8) -> Result<Self, IggyError> { match code { 1 => Ok(HeaderKind::Raw), @@ -175,13 +135,6 @@ impl FromStr for HeaderKind { } } -impl Display for HeaderValue { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}: ", self.kind)?; - write!(f, "{}", self.value_only_to_string()) - } -} - impl Display for HeaderKind { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match *self { @@ -204,28 +157,44 @@ impl Display for HeaderKind { } } -impl FromStr for HeaderValue { +impl<T> Display for HeaderField<T> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}: ", self.kind)?; + write!(f, "{}", self.to_string_value()) + } +} + +impl<T> Hash for HeaderField<T> { + fn hash<H: Hasher>(&self, state: &mut H) { + self.kind.hash(state); + self.value.hash(state); + } +} + +impl<T> FromStr for HeaderField<T> { type Err = IggyError; fn from_str(s: &str) -> Result<Self, Self::Err> { - Self::from(HeaderKind::String, s.as_bytes()) + Self::from_string(s) } } -impl HeaderValue { - /// Creates a new header value from the specified kind and value. - /// The kind is parsed from the string representation. - /// The value is parsed from the string representation. +impl<T> TryFrom<&str> for HeaderField<T> { + type Error = IggyError; + fn try_from(value: &str) -> Result<Self, Self::Error> { + Self::from_string(value) + } +} + +impl<T> HeaderField<T> { pub fn from_kind_str_and_value_str(kind: &str, value: &str) -> Result<Self, IggyError> { let kind = HeaderKind::from_str(kind)?; Self::from_kind_and_value_str(kind, value) } - /// Creates a new header value from the specified kind and value. - /// The value is parsed from the string representation. pub fn from_kind_and_value_str(kind: HeaderKind, value: &str) -> Result<Self, IggyError> { match kind { HeaderKind::Raw => Self::from_raw(value.as_bytes()), - HeaderKind::String => Self::from_str(value), + HeaderKind::String => Self::from_string(value), HeaderKind::Bool => { Self::from_bool(value.parse().map_err(|_| IggyError::InvalidBooleanValue)?) } @@ -267,40 +236,37 @@ impl HeaderValue { } } } - /// Creates a new header value from the specified raw bytes. + pub fn from_raw(value: &[u8]) -> Result<Self, IggyError> { Self::from(HeaderKind::Raw, value) } - /// Returns the raw bytes of the header value. pub fn as_raw(&self) -> Result<&[u8], IggyError> { if self.kind != HeaderKind::Raw { return Err(IggyError::InvalidHeaderValue); } - Ok(&self.value) } - /// Returns the string representation of the header value. + pub fn from_string(value: &str) -> Result<Self, IggyError> { + Self::from(HeaderKind::String, value.as_bytes()) + } + pub fn as_str(&self) -> Result<&str, IggyError> { if self.kind != HeaderKind::String { return Err(IggyError::InvalidHeaderValue); } - std::str::from_utf8(&self.value).map_err(|_| IggyError::InvalidUtf8) } - /// Creates a new header value from the specified string. pub fn from_bool(value: bool) -> Result<Self, IggyError> { Self::from(HeaderKind::Bool, if value { &[1] } else { &[0] }) } - /// Returns the boolean representation of the header value. pub fn as_bool(&self) -> Result<bool, IggyError> { if self.kind != HeaderKind::Bool { return Err(IggyError::InvalidHeaderValue); } - match self.value[0] { 0 => Ok(false), 1 => Ok(true), @@ -308,246 +274,186 @@ impl HeaderValue { } } - /// Creates a new header value from the specified boolean. pub fn from_int8(value: i8) -> Result<Self, IggyError> { Self::from(HeaderKind::Int8, &value.to_le_bytes()) } - /// Returns the i8 representation of the header value. pub fn as_int8(&self) -> Result<i8, IggyError> { if self.kind != HeaderKind::Int8 { return Err(IggyError::InvalidHeaderValue); } - let value = self.value.to_vec().try_into(); if value.is_err() { return Err(IggyError::InvalidHeaderValue); } - Ok(i8::from_le_bytes(value.unwrap())) } - /// Creates a new header value from the specified i8. pub fn from_int16(value: i16) -> Result<Self, IggyError> { Self::from(HeaderKind::Int16, &value.to_le_bytes()) } - /// Returns the i16 representation of the header value. pub fn as_int16(&self) -> Result<i16, IggyError> { if self.kind != HeaderKind::Int16 { return Err(IggyError::InvalidHeaderValue); } - let value = self.value.to_vec().try_into(); if value.is_err() { return Err(IggyError::InvalidHeaderValue); } - Ok(i16::from_le_bytes(value.unwrap())) } - /// Creates a new header value from the specified i16. pub fn from_int32(value: i32) -> Result<Self, IggyError> { Self::from(HeaderKind::Int32, &value.to_le_bytes()) } - /// Returns the i32 representation of the header value. pub fn as_int32(&self) -> Result<i32, IggyError> { if self.kind != HeaderKind::Int32 { return Err(IggyError::InvalidHeaderValue); } - let value = self.value.to_vec().try_into(); if value.is_err() { return Err(IggyError::InvalidHeaderValue); } - Ok(i32::from_le_bytes(value.unwrap())) } - /// Creates a new header value from the specified i32. pub fn from_int64(value: i64) -> Result<Self, IggyError> { Self::from(HeaderKind::Int64, &value.to_le_bytes()) } - /// Returns the i64 representation of the header value. pub fn as_int64(&self) -> Result<i64, IggyError> { if self.kind != HeaderKind::Int64 { return Err(IggyError::InvalidHeaderValue); } - let value = self.value.to_vec().try_into(); if value.is_err() { return Err(IggyError::InvalidHeaderValue); } - Ok(i64::from_le_bytes(value.unwrap())) } - /// Creates a new header value from the specified i128. pub fn from_int128(value: i128) -> Result<Self, IggyError> { Self::from(HeaderKind::Int128, &value.to_le_bytes()) } - /// Returns the i128 representation of the header value. pub fn as_int128(&self) -> Result<i128, IggyError> { if self.kind != HeaderKind::Int128 { return Err(IggyError::InvalidHeaderValue); } - let value = self.value.to_vec().try_into(); if value.is_err() { return Err(IggyError::InvalidHeaderValue); } - Ok(i128::from_le_bytes(value.unwrap())) } - /// Creates a new header value from the specified u8. pub fn from_uint8(value: u8) -> Result<Self, IggyError> { Self::from(HeaderKind::Uint8, &value.to_le_bytes()) } - /// Returns the u8 representation of the header value. pub fn as_uint8(&self) -> Result<u8, IggyError> { if self.kind != HeaderKind::Uint8 { return Err(IggyError::InvalidHeaderValue); } - Ok(self.value[0]) } - /// Creates a new header value from the specified u16. pub fn from_uint16(value: u16) -> Result<Self, IggyError> { Self::from(HeaderKind::Uint16, &value.to_le_bytes()) } - /// Returns the u16 representation of the header value. pub fn as_uint16(&self) -> Result<u16, IggyError> { if self.kind != HeaderKind::Uint16 { return Err(IggyError::InvalidHeaderValue); } - let value = self.value.to_vec().try_into(); if value.is_err() { return Err(IggyError::InvalidHeaderValue); } - Ok(u16::from_le_bytes(value.unwrap())) } - /// Creates a new header value from the specified u32. pub fn from_uint32(value: u32) -> Result<Self, IggyError> { Self::from(HeaderKind::Uint32, &value.to_le_bytes()) } - /// Returns the u32 representation of the header value. pub fn as_uint32(&self) -> Result<u32, IggyError> { if self.kind != HeaderKind::Uint32 { return Err(IggyError::InvalidHeaderValue); } - let value = self.value.to_vec().try_into(); if value.is_err() { return Err(IggyError::InvalidHeaderValue); } - Ok(u32::from_le_bytes(value.unwrap())) } - /// Creates a new header value from the specified u64. pub fn from_uint64(value: u64) -> Result<Self, IggyError> { Self::from(HeaderKind::Uint64, &value.to_le_bytes()) } - /// Returns the u64 representation of the header value. pub fn as_uint64(&self) -> Result<u64, IggyError> { if self.kind != HeaderKind::Uint64 { return Err(IggyError::InvalidHeaderValue); } - let value = self.value.to_vec().try_into(); if value.is_err() { return Err(IggyError::InvalidHeaderValue); } - Ok(u64::from_le_bytes(value.unwrap())) } - /// Creates a new header value from the specified u128. pub fn from_uint128(value: u128) -> Result<Self, IggyError> { Self::from(HeaderKind::Uint128, &value.to_le_bytes()) } - /// Returns the u128 representation of the header value. pub fn as_uint128(&self) -> Result<u128, IggyError> { if self.kind != HeaderKind::Uint128 { return Err(IggyError::InvalidHeaderValue); } - let value = self.value.to_vec().try_into(); if value.is_err() { return Err(IggyError::InvalidHeaderValue); } - Ok(u128::from_le_bytes(value.unwrap())) } - /// Creates a new header value from the specified f32. pub fn from_float32(value: f32) -> Result<Self, IggyError> { Self::from(HeaderKind::Float32, &value.to_le_bytes()) } - /// Returns the f32 representation of the header value. pub fn as_float32(&self) -> Result<f32, IggyError> { if self.kind != HeaderKind::Float32 { return Err(IggyError::InvalidHeaderValue); } - let value = self.value.to_vec().try_into(); if value.is_err() { return Err(IggyError::InvalidHeaderValue); } - Ok(f32::from_le_bytes(value.unwrap())) } - /// Creates a new header value from the specified f64. pub fn from_float64(value: f64) -> Result<Self, IggyError> { Self::from(HeaderKind::Float64, &value.to_le_bytes()) } - /// Returns the f64 representation of the header value. pub fn as_float64(&self) -> Result<f64, IggyError> { if self.kind != HeaderKind::Float64 { return Err(IggyError::InvalidHeaderValue); } - let value = self.value.to_vec().try_into(); if value.is_err() { return Err(IggyError::InvalidHeaderValue); } - Ok(f64::from_le_bytes(value.unwrap())) } - /// Creates a new header value from the specified kind and value. - fn from(kind: HeaderKind, value: &[u8]) -> Result<Self, IggyError> { - if value.is_empty() || value.len() > 255 { - return Err(IggyError::InvalidHeaderValue); - } - - Ok(Self { - kind, - value: Bytes::from(value.to_vec()), - }) - } - - /// Returns the string representation of the header value without the kind. - pub fn value_only_to_string(&self) -> String { + pub fn to_string_value(&self) -> String { match self.kind { HeaderKind::Raw => format!("{:?}", self.value), - HeaderKind::String => format!("{}", String::from_utf8_lossy(&self.value)), + HeaderKind::String => String::from_utf8_lossy(&self.value).to_string(), HeaderKind::Bool => format!("{}", self.value[0] != 0), HeaderKind::Int8 => format!( "{}", @@ -599,6 +505,17 @@ impl HeaderValue { ), } } + + fn from(kind: HeaderKind, value: &[u8]) -> Result<Self, IggyError> { + if value.is_empty() || value.len() > 255 { + return Err(IggyError::InvalidHeaderValue); + } + Ok(Self { + kind, + value: Bytes::from(value.to_vec()), + _marker: PhantomData, + }) + } } impl BytesSerializable for HashMap<HeaderKey, HeaderValue> { @@ -609,9 +526,10 @@ impl BytesSerializable for HashMap<HeaderKey, HeaderValue> { let mut bytes = BytesMut::new(); for (key, value) in self { + bytes.put_u8(key.kind.as_code()); #[allow(clippy::cast_possible_truncation)] - bytes.put_u32_le(key.0.len() as u32); - bytes.put_slice(key.0.as_bytes()); + bytes.put_u32_le(key.value.len() as u32); + bytes.put_slice(&key.value); bytes.put_u8(value.kind.as_code()); #[allow(clippy::cast_possible_truncation)] bytes.put_u32_le(value.value.len() as u32); @@ -632,6 +550,8 @@ impl BytesSerializable for HashMap<HeaderKey, HeaderValue> { let mut headers = Self::new(); let mut position = 0; while position < bytes.len() { + let key_kind = HeaderKind::from_code(bytes[position])?; + position += 1; let key_length = u32::from_le_bytes( bytes[position..position + 4] .try_into() @@ -642,15 +562,10 @@ impl BytesSerializable for HashMap<HeaderKey, HeaderValue> { return Err(IggyError::InvalidHeaderKey); } position += 4; - let key = match String::from_utf8(bytes[position..position + key_length].to_vec()) { - Ok(k) => k, - Err(e) => { - tracing::error!("Invalid header key: {e}"); - return Err(IggyError::InvalidHeaderKey); - } - }; + let key_value = bytes[position..position + key_length].to_vec(); position += key_length; - let kind = HeaderKind::from_code(bytes[position])?; + + let value_kind = HeaderKind::from_code(bytes[position])?; position += 1; let value_length = u32::from_le_bytes( bytes[position..position + 4] @@ -662,13 +577,19 @@ impl BytesSerializable for HashMap<HeaderKey, HeaderValue> { return Err(IggyError::InvalidHeaderValue); } position += 4; - let value = bytes[position..position + value_length].to_vec(); + let value_value = bytes[position..position + value_length].to_vec(); position += value_length; + headers.insert( - HeaderKey(key), + HeaderKey { + kind: key_kind, + value: Bytes::from(key_value), + _marker: PhantomData, + }, HeaderValue { - kind, - value: Bytes::from(value), + kind: value_kind, + value: Bytes::from(value_value), + _marker: PhantomData, }, ); } @@ -677,17 +598,60 @@ impl BytesSerializable for HashMap<HeaderKey, HeaderValue> { } } -/// Returns the size in bytes of the specified headers. pub fn get_user_headers_size(headers: &Option<HashMap<HeaderKey, HeaderValue>>) -> Option<u32> { let mut size = 0; if let Some(headers) = headers { for (key, value) in headers { - size += 4 + key.as_str().len() as u32 + 1 + 4 + value.value.len() as u32; + size += 1 + 4 + key.value.len() as u32 + 1 + 4 + value.value.len() as u32; } } Some(size) } +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct HeaderEntry { + pub key: HeaderKey, + pub value: HeaderValue, +} + +pub fn serialize_headers<S>(headers: &Option<UserHeaders>, serializer: S) -> Result<S::Ok, S::Error> +where + S: serde::Serializer, +{ + use serde::ser::SerializeSeq; + + match headers { + Some(map) => { + let mut seq = serializer.serialize_seq(Some(map.len()))?; + for (key, value) in map { + seq.serialize_element(&HeaderEntry { + key: key.clone(), + value: value.clone(), + })?; + } + seq.end() + } + None => serializer.serialize_none(), + } +} + +pub fn deserialize_headers<'de, D>(deserializer: D) -> Result<Option<UserHeaders>, D::Error> +where + D: serde::Deserializer<'de>, +{ + let entries: Option<Vec<HeaderEntry>> = Option::deserialize(deserializer)?; + match entries { + Some(vec) => { + let mut map = UserHeaders::new(); + for entry in vec { + map.insert(entry.key, entry.value); + } + Ok(Some(map)) + } + None => Ok(None), + } +} + #[cfg(test)] mod tests { use super::*; @@ -695,32 +659,44 @@ mod tests { #[test] fn header_key_should_be_created_for_valid_value() { let value = "key-1"; - let header_key = HeaderKey::new(value); + let header_key = HeaderKey::from_string(value); assert!(header_key.is_ok()); - assert_eq!(header_key.unwrap().0, value); + let header_key = header_key.unwrap(); + assert_eq!(header_key.kind, HeaderKind::String); + assert_eq!(header_key.as_str().unwrap(), value); } #[test] fn header_key_should_not_be_created_for_empty_value() { let value = ""; - let header_key = HeaderKey::new(value); + let header_key = HeaderKey::from_string(value); assert!(header_key.is_err()); let error = header_key.unwrap_err(); - assert_eq!(error.as_code(), IggyError::InvalidHeaderKey.as_code()); + assert_eq!(error.as_code(), IggyError::InvalidHeaderValue.as_code()); } #[test] fn header_key_should_not_be_created_for_too_long_value() { let value = "a".repeat(256); - let header_key = HeaderKey::new(&value); + let header_key = HeaderKey::from_string(&value); assert!(header_key.is_err()); let error = header_key.unwrap_err(); - assert_eq!(error.as_code(), IggyError::InvalidHeaderKey.as_code()); + assert_eq!(error.as_code(), IggyError::InvalidHeaderValue.as_code()); + } + + #[test] + fn header_key_should_be_created_from_int32() { + let value = 12345i32; + let header_key = HeaderKey::from_int32(value); + assert!(header_key.is_ok()); + let header_key = header_key.unwrap(); + assert_eq!(header_key.kind, HeaderKind::Int32); + assert_eq!(header_key.as_int32().unwrap(), value); } #[test] fn header_value_should_not_be_created_for_empty_value() { - let header_value = HeaderValue::from(HeaderKind::Raw, &[]); + let header_value = HeaderValue::from_raw(&[]); assert!(header_value.is_err()); let error = header_value.unwrap_err(); assert_eq!(error.as_code(), IggyError::InvalidHeaderValue.as_code()); @@ -729,7 +705,7 @@ mod tests { #[test] fn header_value_should_not_be_created_for_too_long_value() { let value = b"a".repeat(256); - let header_value = HeaderValue::from(HeaderKind::Raw, &value); + let header_value = HeaderValue::from_raw(&value); assert!(header_value.is_err()); let error = header_value.unwrap_err(); assert_eq!(error.as_code(), IggyError::InvalidHeaderValue.as_code()); @@ -1133,102 +1109,102 @@ mod tests { } #[test] - fn value_only_to_string_for_string_kind() { + fn to_string_value_for_string_kind() { let header_value = HeaderValue::from_str("Hello").unwrap(); - assert_eq!(header_value.value_only_to_string(), "Hello"); + assert_eq!(header_value.to_string_value(), "Hello"); } #[test] - fn value_only_to_string_for_bool_kind() { + fn to_string_value_for_bool_kind() { let header_value = HeaderValue::from_bool(true).unwrap(); - assert_eq!(header_value.value_only_to_string(), "true"); + assert_eq!(header_value.to_string_value(), "true"); } #[test] - fn value_only_to_string_for_int8_kind() { + fn to_string_value_for_int8_kind() { let header_value = HeaderValue::from_int8(123).unwrap(); - assert_eq!(header_value.value_only_to_string(), "123"); + assert_eq!(header_value.to_string_value(), "123"); } #[test] - fn value_only_to_string_for_int16_kind() { + fn to_string_value_for_int16_kind() { let header_value = HeaderValue::from_int16(12345).unwrap(); - assert_eq!(header_value.value_only_to_string(), "12345"); + assert_eq!(header_value.to_string_value(), "12345"); } #[test] - fn value_only_to_string_for_int32_kind() { + fn to_string_value_for_int32_kind() { let header_value = HeaderValue::from_int32(123456).unwrap(); - assert_eq!(header_value.value_only_to_string(), "123456"); + assert_eq!(header_value.to_string_value(), "123456"); } #[test] - fn value_only_to_string_for_int64_kind() { + fn to_string_value_for_int64_kind() { let header_value = HeaderValue::from_int64(123456789).unwrap(); - assert_eq!(header_value.value_only_to_string(), "123456789"); + assert_eq!(header_value.to_string_value(), "123456789"); } #[test] - fn value_only_to_string_for_int128_kind() { + fn to_string_value_for_int128_kind() { let header_value = HeaderValue::from_int128(123456789123456789).unwrap(); - assert_eq!(header_value.value_only_to_string(), "123456789123456789"); + assert_eq!(header_value.to_string_value(), "123456789123456789"); } #[test] - fn value_only_to_string_for_uint8_kind() { + fn to_string_value_for_uint8_kind() { let header_value = HeaderValue::from_uint8(123).unwrap(); - assert_eq!(header_value.value_only_to_string(), "123"); + assert_eq!(header_value.to_string_value(), "123"); } #[test] - fn value_only_to_string_for_uint16_kind() { + fn to_string_value_for_uint16_kind() { let header_value = HeaderValue::from_uint16(12345).unwrap(); - assert_eq!(header_value.value_only_to_string(), "12345"); + assert_eq!(header_value.to_string_value(), "12345"); } #[test] - fn value_only_to_string_for_uint32_kind() { + fn to_string_value_for_uint32_kind() { let header_value = HeaderValue::from_uint32(123456).unwrap(); - assert_eq!(header_value.value_only_to_string(), "123456"); + assert_eq!(header_value.to_string_value(), "123456"); } #[test] - fn value_only_to_string_for_uint64_kind() { + fn to_string_value_for_uint64_kind() { let header_value = HeaderValue::from_uint64(123456789).unwrap(); - assert_eq!(header_value.value_only_to_string(), "123456789"); + assert_eq!(header_value.to_string_value(), "123456789"); } #[test] - fn value_only_to_string_for_uint128_kind() { + fn to_string_value_for_uint128_kind() { let header_value = HeaderValue::from_uint128(123456789123456789).unwrap(); - assert_eq!(header_value.value_only_to_string(), "123456789123456789"); + assert_eq!(header_value.to_string_value(), "123456789123456789"); } #[test] - fn value_only_to_string_for_float32_kind() { + fn to_string_value_for_float32_kind() { let header_value = HeaderValue::from_float32(123.01).unwrap(); - assert_eq!(header_value.value_only_to_string(), "123.01"); + assert_eq!(header_value.to_string_value(), "123.01"); } #[test] - fn value_only_to_string_for_float64_kind() { + fn to_string_value_for_float64_kind() { let header_value = HeaderValue::from_float64(1234.01234).unwrap(); - assert_eq!(header_value.value_only_to_string(), "1234.01234"); + assert_eq!(header_value.to_string_value(), "1234.01234"); } #[test] fn should_be_serialized_as_bytes() { let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("key-1").unwrap(), + HeaderKey::from_string("key-1").unwrap(), HeaderValue::from_str("Value 1").unwrap(), ); headers.insert( - HeaderKey::new("key 1").unwrap(), + HeaderKey::from_string("key 1").unwrap(), HeaderValue::from_uint64(12345).unwrap(), ); headers.insert( - HeaderKey::new("key_3").unwrap(), + HeaderKey::from_string("key_3").unwrap(), HeaderValue::from_bool(true).unwrap(), ); @@ -1237,22 +1213,31 @@ mod tests { let mut position = 0; let mut headers_count = 0; while position < bytes.len() { + let key_kind = HeaderKind::from_code(bytes[position]).unwrap(); + position += 1; let key_length = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()) as usize; position += 4; - let key = String::from_utf8(bytes[position..position + key_length].to_vec()).unwrap(); + let key_value = bytes[position..position + key_length].to_vec(); position += key_length; - let kind = HeaderKind::from_code(bytes[position]).unwrap(); + + let value_kind = HeaderKind::from_code(bytes[position]).unwrap(); position += 1; let value_length = u32::from_le_bytes(bytes[position..position + 4].try_into().unwrap()) as usize; position += 4; let value = bytes[position..position + value_length].to_vec(); position += value_length; - let header = headers.get(&HeaderKey::new(&key).unwrap()); + + let key = HeaderKey { + kind: key_kind, + value: Bytes::from(key_value), + _marker: PhantomData, + }; + let header = headers.get(&key); assert!(header.is_some()); let header = header.unwrap(); - assert_eq!(header.kind, kind); + assert_eq!(header.kind, value_kind); assert_eq!(header.value, value); headers_count += 1; } @@ -1264,22 +1249,23 @@ mod tests { fn should_be_deserialized_from_bytes() { let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("key-1").unwrap(), + HeaderKey::from_string("key-1").unwrap(), HeaderValue::from_str("Value 1").unwrap(), ); headers.insert( - HeaderKey::new("key 2").unwrap(), + HeaderKey::from_string("key 2").unwrap(), HeaderValue::from_uint64(12345).unwrap(), ); headers.insert( - HeaderKey::new("key_3").unwrap(), + HeaderKey::from_string("key_3").unwrap(), HeaderValue::from_bool(true).unwrap(), ); let mut bytes = BytesMut::new(); for (key, value) in &headers { - bytes.put_u32_le(key.0.len() as u32); - bytes.put_slice(key.0.as_bytes()); + bytes.put_u8(key.kind.as_code()); + bytes.put_u32_le(key.value.len() as u32); + bytes.put_slice(&key.value); bytes.put_u8(value.kind.as_code()); bytes.put_u32_le(value.value.len() as u32); bytes.put_slice(&value.value); @@ -1299,4 +1285,29 @@ mod tests { assert_eq!(deserialized_value.value, value.value); } } + + #[test] + fn should_serialize_and_deserialize_typed_keys() { + let mut headers = HashMap::new(); + headers.insert( + HeaderKey::from_int32(123).unwrap(), + HeaderValue::from_str("Value for int key").unwrap(), + ); + headers.insert( + HeaderKey::from_uint64(999).unwrap(), + HeaderValue::from_bool(true).unwrap(), + ); + + let bytes = headers.to_bytes(); + let deserialized = HashMap::<HeaderKey, HeaderValue>::from_bytes(bytes).unwrap(); + + assert_eq!(deserialized.len(), headers.len()); + for (key, value) in &headers { + let deserialized_value = deserialized.get(key); + assert!(deserialized_value.is_some()); + let deserialized_value = deserialized_value.unwrap(); + assert_eq!(deserialized_value.kind, value.kind); + assert_eq!(deserialized_value.value, value.value); + } + } } diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml index 26c6fc9d3..270471abe 100644 --- a/core/connectors/runtime/Cargo.toml +++ b/core/connectors/runtime/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy-connectors" -version = "0.2.1-edge.6" +version = "0.2.2-edge.1" description = "Connectors runtime for Iggy message streaming platform" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml index 530c5ba56..fced2f289 100644 --- a/core/connectors/sdk/Cargo.toml +++ b/core/connectors/sdk/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_sdk" -version = "0.1.1-edge.3" +version = "0.1.2-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/elasticsearch_sink/Cargo.toml b/core/connectors/sinks/elasticsearch_sink/Cargo.toml index 6d238eeca..633effbf0 100644 --- a/core/connectors/sinks/elasticsearch_sink/Cargo.toml +++ b/core/connectors/sinks/elasticsearch_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_elasticsearch_sink" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" description = "Iggy Elasticsearch sink connector" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/iceberg_sink/Cargo.toml b/core/connectors/sinks/iceberg_sink/Cargo.toml index 8eb33b345..4b63979de 100644 --- a/core/connectors/sinks/iceberg_sink/Cargo.toml +++ b/core/connectors/sinks/iceberg_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_iceberg_sink" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" edition = "2024" license = "Apache-2.0" keywords = ["iggy", "messaging", "streaming"] diff --git a/core/connectors/sinks/postgres_sink/Cargo.toml b/core/connectors/sinks/postgres_sink/Cargo.toml index 4c9549830..218a0635c 100644 --- a/core/connectors/sinks/postgres_sink/Cargo.toml +++ b/core/connectors/sinks/postgres_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_postgres_sink" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" description = "Iggy PostgreSQL sink connector for storing stream messages into PostgreSQL database" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/quickwit_sink/Cargo.toml b/core/connectors/sinks/quickwit_sink/Cargo.toml index 228b0e3be..51fa78f10 100644 --- a/core/connectors/sinks/quickwit_sink/Cargo.toml +++ b/core/connectors/sinks/quickwit_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_quickwit_sink" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml b/core/connectors/sinks/stdout_sink/Cargo.toml index ce63bc1ba..bdceb6297 100644 --- a/core/connectors/sinks/stdout_sink/Cargo.toml +++ b/core/connectors/sinks/stdout_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_stdout_sink" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sources/elasticsearch_source/Cargo.toml b/core/connectors/sources/elasticsearch_source/Cargo.toml index e871ba326..3f53386cb 100644 --- a/core/connectors/sources/elasticsearch_source/Cargo.toml +++ b/core/connectors/sources/elasticsearch_source/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_elasticsearch_source" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" description = "Iggy Elasticsearch source connector" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sources/postgres_source/Cargo.toml b/core/connectors/sources/postgres_source/Cargo.toml index 6c8245707..694235390 100644 --- a/core/connectors/sources/postgres_source/Cargo.toml +++ b/core/connectors/sources/postgres_source/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_postgres_source" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" description = "Iggy PostgreSQL source connector supporting CDC and table polling for message streaming platform" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sources/random_source/Cargo.toml b/core/connectors/sources/random_source/Cargo.toml index ba5acc657..11b4d4a6e 100644 --- a/core/connectors/sources/random_source/Cargo.toml +++ b/core/connectors/sources/random_source/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_random_source" -version = "0.2.0-edge.1" +version = "0.2.1-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/integration/tests/cli/message/test_message_poll_command.rs b/core/integration/tests/cli/message/test_message_poll_command.rs index 2a4c2b4f3..69f52e336 100644 --- a/core/integration/tests/cli/message/test_message_poll_command.rs +++ b/core/integration/tests/cli/message/test_message_poll_command.rs @@ -197,7 +197,7 @@ impl IggyCmdTestCase for TestMessagePollCmd { status = status .stdout(contains(format!("Header: {}", self.headers.0))) .stdout(contains(self.headers.1.kind.to_string())) - .stdout(contains(self.headers.1.value_only_to_string()).count(self.message_count)) + .stdout(contains(self.headers.1.to_string_value()).count(self.message_count)) } // Check if messages are printed based on the strategy diff --git a/core/integration/tests/cli/message/test_message_send_command.rs b/core/integration/tests/cli/message/test_message_send_command.rs index 27227cd94..b0dcb27b6 100644 --- a/core/integration/tests/cli/message/test_message_send_command.rs +++ b/core/integration/tests/cli/message/test_message_send_command.rs @@ -109,7 +109,7 @@ impl TestMessageSendCmd { command.push( header .iter() - .map(|(k, v)| format!("{k}:{}:{}", v.kind, v.value_only_to_string())) + .map(|(k, v)| format!("{k}:{}:{}", v.kind, v.to_string_value())) .collect::<Vec<_>>() .join(","), ); @@ -340,11 +340,11 @@ pub async fn should_be_successful() { using_partitioning, Some(HashMap::from([ ( - HeaderKey::new("key1").unwrap(), + HeaderKey::from_string("key1").unwrap(), HeaderValue::from_kind_str_and_value_str("string", "value1").unwrap(), ), ( - HeaderKey::new("key2").unwrap(), + HeaderKey::from_string("key2").unwrap(), HeaderValue::from_kind_str_and_value_str("int32", "42").unwrap(), ), ])), diff --git a/core/integration/tests/server/scenarios/create_message_payload.rs b/core/integration/tests/server/scenarios/create_message_payload.rs index a35a18afb..20d1127af 100644 --- a/core/integration/tests/server/scenarios/create_message_payload.rs +++ b/core/integration/tests/server/scenarios/create_message_payload.rs @@ -82,7 +82,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(headers.len(), 3); assert_eq!( headers - .get(&HeaderKey::new("key_1").unwrap()) + .get(&HeaderKey::from_string("key_1").unwrap()) .unwrap() .as_str() .unwrap(), @@ -90,14 +90,14 @@ pub async fn run(client_factory: &dyn ClientFactory) { ); assert!( headers - .get(&HeaderKey::new("key 2").unwrap()) + .get(&HeaderKey::from_string("key 2").unwrap()) .unwrap() .as_bool() .unwrap(), ); assert_eq!( headers - .get(&HeaderKey::new("key-3").unwrap()) + .get(&HeaderKey::from_string("key-3").unwrap()) .unwrap() .as_uint64() .unwrap(), @@ -141,15 +141,15 @@ fn create_message_payload(offset: u64) -> Bytes { fn create_message_headers() -> HashMap<HeaderKey, HeaderValue> { let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("key_1").unwrap(), + HeaderKey::from_string("key_1").unwrap(), HeaderValue::from_str("Value 1").unwrap(), ); headers.insert( - HeaderKey::new("key 2").unwrap(), + HeaderKey::from_string("key 2").unwrap(), HeaderValue::from_bool(true).unwrap(), ); headers.insert( - HeaderKey::new("key-3").unwrap(), + HeaderKey::from_string("key-3").unwrap(), HeaderValue::from_uint64(123456).unwrap(), ); headers diff --git a/core/integration/tests/server/scenarios/encryption_scenario.rs b/core/integration/tests/server/scenarios/encryption_scenario.rs index 055f3ef46..23dd5c19c 100644 --- a/core/integration/tests/server/scenarios/encryption_scenario.rs +++ b/core/integration/tests/server/scenarios/encryption_scenario.rs @@ -98,19 +98,19 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp for i in 0..messages_per_batch { let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("batch").unwrap(), + HeaderKey::from_string("batch").unwrap(), HeaderValue::from_uint64(1).unwrap(), ); headers.insert( - HeaderKey::new("index").unwrap(), + HeaderKey::from_string("index").unwrap(), HeaderValue::from_uint64(i).unwrap(), ); headers.insert( - HeaderKey::new("type").unwrap(), + HeaderKey::from_string("type").unwrap(), HeaderValue::from_str("test-message").unwrap(), ); headers.insert( - HeaderKey::new("encrypted").unwrap(), + HeaderKey::from_string("encrypted").unwrap(), HeaderValue::from_bool(encryption).unwrap(), ); @@ -195,7 +195,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp let headers = msg.user_headers_map().unwrap().unwrap(); assert_eq!( headers - .get(&HeaderKey::new("batch").unwrap()) + .get(&HeaderKey::from_string("batch").unwrap()) .unwrap() .as_uint64() .unwrap(), @@ -203,7 +203,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp ); assert_eq!( headers - .get(&HeaderKey::new("type").unwrap()) + .get(&HeaderKey::from_string("type").unwrap()) .unwrap() .as_str() .unwrap(), @@ -211,7 +211,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp ); assert_eq!( headers - .get(&HeaderKey::new("encrypted").unwrap()) + .get(&HeaderKey::from_string("encrypted").unwrap()) .unwrap() .as_bool() .unwrap(), @@ -243,19 +243,19 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp for i in 0..messages_per_batch { let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("batch").unwrap(), + HeaderKey::from_string("batch").unwrap(), HeaderValue::from_uint64(2).unwrap(), ); headers.insert( - HeaderKey::new("index").unwrap(), + HeaderKey::from_string("index").unwrap(), HeaderValue::from_uint64(i).unwrap(), ); headers.insert( - HeaderKey::new("type").unwrap(), + HeaderKey::from_string("type").unwrap(), HeaderValue::from_str("test-message-after-restart").unwrap(), ); headers.insert( - HeaderKey::new("encrypted").unwrap(), + HeaderKey::from_string("encrypted").unwrap(), HeaderValue::from_bool(encryption).unwrap(), ); @@ -322,7 +322,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp assert!(msg.user_headers.is_some()); let headers = msg.user_headers_map().unwrap().unwrap(); let batch_num = headers - .get(&HeaderKey::new("batch").unwrap()) + .get(&HeaderKey::from_string("batch").unwrap()) .unwrap() .as_uint64() .unwrap(); @@ -331,7 +331,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp batch_1_count += 1; assert_eq!( headers - .get(&HeaderKey::new("type").unwrap()) + .get(&HeaderKey::from_string("type").unwrap()) .unwrap() .as_str() .unwrap(), @@ -339,7 +339,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp ); assert_eq!( headers - .get(&HeaderKey::new("encrypted").unwrap()) + .get(&HeaderKey::from_string("encrypted").unwrap()) .unwrap() .as_bool() .unwrap(), @@ -349,7 +349,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp batch_2_count += 1; assert_eq!( headers - .get(&HeaderKey::new("type").unwrap()) + .get(&HeaderKey::from_string("type").unwrap()) .unwrap() .as_str() .unwrap(), diff --git a/core/integration/tests/server/scenarios/message_headers_scenario.rs b/core/integration/tests/server/scenarios/message_headers_scenario.rs index 89c9aebee..4de9afb29 100644 --- a/core/integration/tests/server/scenarios/message_headers_scenario.rs +++ b/core/integration/tests/server/scenarios/message_headers_scenario.rs @@ -79,7 +79,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(headers.len(), 3); assert_eq!( headers - .get(&HeaderKey::new("key_1").unwrap()) + .get(&HeaderKey::from_string("key_1").unwrap()) .unwrap() .as_str() .unwrap(), @@ -87,14 +87,14 @@ pub async fn run(client_factory: &dyn ClientFactory) { ); assert!( headers - .get(&HeaderKey::new("key 2").unwrap()) + .get(&HeaderKey::from_string("key 2").unwrap()) .unwrap() .as_bool() .unwrap(), ); assert_eq!( headers - .get(&HeaderKey::new("key-3").unwrap()) + .get(&HeaderKey::from_string("key-3").unwrap()) .unwrap() .as_uint64() .unwrap(), @@ -131,15 +131,15 @@ fn create_message_payload(offset: u64) -> Bytes { fn create_message_headers() -> HashMap<HeaderKey, HeaderValue> { let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("key_1").unwrap(), + HeaderKey::from_string("key_1").unwrap(), HeaderValue::from_str("Value 1").unwrap(), ); headers.insert( - HeaderKey::new("key 2").unwrap(), + HeaderKey::from_string("key 2").unwrap(), HeaderValue::from_bool(true).unwrap(), ); headers.insert( - HeaderKey::new("key-3").unwrap(), + HeaderKey::from_string("key-3").unwrap(), HeaderValue::from_uint64(123456).unwrap(), ); headers diff --git a/core/integration/tests/server/scenarios/message_size_scenario.rs b/core/integration/tests/server/scenarios/message_size_scenario.rs index f2742c8fa..3b5f45278 100644 --- a/core/integration/tests/server/scenarios/message_size_scenario.rs +++ b/core/integration/tests/server/scenarios/message_size_scenario.rs @@ -221,7 +221,7 @@ fn create_message_header_of_size(target_size: usize) -> HashMap<HeaderKey, Heade remaining_size - total_overhead }; - let key = HeaderKey::new(key_str.as_str()).unwrap(); + let key = HeaderKey::from_string(key_str.as_str()).unwrap(); let value = HeaderValue::from_str(create_string_of_size(value_size).as_str()).unwrap(); let actual_header_size = 4 + key_str.len() + 1 + 4 + value_size; diff --git a/core/integration/tests/server/scenarios/offset_scenario.rs b/core/integration/tests/server/scenarios/offset_scenario.rs index cc519bd63..dd35892c8 100644 --- a/core/integration/tests/server/scenarios/offset_scenario.rs +++ b/core/integration/tests/server/scenarios/offset_scenario.rs @@ -174,15 +174,15 @@ fn create_single_message(id: u32, message_size: u64) -> IggyMessage { let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("key_1").unwrap(), + HeaderKey::from_string("key_1").unwrap(), HeaderValue::from_str("Value 1").unwrap(), ); headers.insert( - HeaderKey::new("key 2").unwrap(), + HeaderKey::from_string("key 2").unwrap(), HeaderValue::from_bool(true).unwrap(), ); headers.insert( - HeaderKey::new("key-3").unwrap(), + HeaderKey::from_string("key-3").unwrap(), HeaderValue::from_uint64(123456).unwrap(), ); diff --git a/core/integration/tests/server/scenarios/timestamp_scenario.rs b/core/integration/tests/server/scenarios/timestamp_scenario.rs index 542ca17cc..d8ddfe109 100644 --- a/core/integration/tests/server/scenarios/timestamp_scenario.rs +++ b/core/integration/tests/server/scenarios/timestamp_scenario.rs @@ -198,15 +198,15 @@ fn create_single_message(id: u32, message_size: u64) -> IggyMessage { let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("key_1").unwrap(), + HeaderKey::from_string("key_1").unwrap(), HeaderValue::from_str("Value 1").unwrap(), ); headers.insert( - HeaderKey::new("key 2").unwrap(), + HeaderKey::from_string("key 2").unwrap(), HeaderValue::from_bool(true).unwrap(), ); headers.insert( - HeaderKey::new("key-3").unwrap(), + HeaderKey::from_string("key-3").unwrap(), HeaderValue::from_uint64(123456).unwrap(), ); diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index 3f16531c6..4fa28e16d 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy" -version = "0.8.1-edge.7" +version = "0.8.2-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 8adfda896..c2644498d 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "server" -version = "0.6.1-edge.6" +version = "0.6.2-edge.1" edition = "2024" license = "Apache-2.0" diff --git a/core/tools/src/data-seeder/seeder.rs b/core/tools/src/data-seeder/seeder.rs index bbd176fbd..045cd3ffc 100644 --- a/core/tools/src/data-seeder/seeder.rs +++ b/core/tools/src/data-seeder/seeder.rs @@ -157,11 +157,16 @@ async fn send_messages(client: &IggyClient, streams: &[(String, u32)]) -> Result false => None, true => { let mut headers = HashMap::new(); - headers - .insert(HeaderKey::new("key 1")?, HeaderValue::from_str("value1")?); - headers.insert(HeaderKey::new("key-2")?, HeaderValue::from_bool(true)?); headers.insert( - HeaderKey::new("key_3")?, + HeaderKey::from_string("key 1")?, + HeaderValue::from_str("value1")?, + ); + headers.insert( + HeaderKey::from_string("key-2")?, + HeaderValue::from_bool(true)?, + ); + headers.insert( + HeaderKey::from_string("key_3")?, HeaderValue::from_uint64(123456)?, ); Some(headers) diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index 6fd30e6b5..0dc7694ac 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_examples" -version = "0.0.5" +version = "0.0.6" edition = "2024" license = "Apache-2.0" @@ -78,6 +78,14 @@ path = "src/message-headers/message-compression/consumer/main.rs" name = "message-headers-compression-producer" path = "src/message-headers/message-compression/producer/main.rs" +[[example]] +name = "typed-headers-consumer" +path = "src/message-headers/typed-headers/consumer/main.rs" + +[[example]] +name = "typed-headers-producer" +path = "src/message-headers/typed-headers/producer/main.rs" + [[example]] name = "multi-tenant-consumer" path = "src/multi-tenant/consumer/main.rs" diff --git a/examples/rust/README.md b/examples/rust/README.md index 0f55671cc..4a546f73d 100644 --- a/examples/rust/README.md +++ b/examples/rust/README.md @@ -101,6 +101,8 @@ cargo run --example message-headers-type-producer cargo run --example message-headers-type-consumer ``` +Demonstrates using HeaderKey/HeaderValue for message metadata instead of payload-based typing, with header-based message routing. + Shows how user headers can be used for message compression in transit: ```bash @@ -108,7 +110,12 @@ cargo run --example message-headers-compression-producer cargo run --example message-headers-compression-consumer ``` -Demonstrates using HeaderKey/HeaderValue for message metadata instead of payload-based typing, with header-based message routing. +Demonstrates typed header keys and values with various data types (strings, integers, floats, booleans, raw bytes): + +```bash +cargo run --example typed-headers-producer +cargo run --example typed-headers-consumer +``` ### Message Envelopes diff --git a/examples/rust/src/message-headers/message-type/consumer/main.rs b/examples/rust/src/message-headers/message-type/consumer/main.rs index d3ad3ca64..bdc6bb451 100644 --- a/examples/rust/src/message-headers/message-type/consumer/main.rs +++ b/examples/rust/src/message-headers/message-type/consumer/main.rs @@ -51,7 +51,7 @@ fn handle_message(message: &IggyMessage) -> Result<(), Box<dyn Error>> { // The payload can be of any type as it is a raw byte array. In this case it's a JSON string. let payload = std::str::from_utf8(&message.payload)?; // The message type is stored in the custom message header. - let header_key = HeaderKey::new("message_type").unwrap(); + let header_key = HeaderKey::from_string("message_type").unwrap(); let headers_map = message.user_headers_map()?.unwrap(); let message_type = headers_map.get(&header_key).unwrap().as_str()?; info!( diff --git a/examples/rust/src/message-headers/message-type/producer/main.rs b/examples/rust/src/message-headers/message-type/producer/main.rs index 4b762c32c..680b56bd4 100644 --- a/examples/rust/src/message-headers/message-type/producer/main.rs +++ b/examples/rust/src/message-headers/message-type/producer/main.rs @@ -86,7 +86,7 @@ async fn produce_messages(args: &Args, client: &IggyClient) -> Result<(), Box<dy // The message type will be stored in the custom message header. let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("message_type").unwrap(), + HeaderKey::from_string("message_type").unwrap(), HeaderValue::from_str(message_type).unwrap(), ); diff --git a/examples/rust/src/message-headers/message-type/consumer/main.rs b/examples/rust/src/message-headers/typed-headers/consumer/main.rs similarity index 60% copy from examples/rust/src/message-headers/message-type/consumer/main.rs copy to examples/rust/src/message-headers/typed-headers/consumer/main.rs index d3ad3ca64..ce9a3be76 100644 --- a/examples/rust/src/message-headers/message-type/consumer/main.rs +++ b/examples/rust/src/message-headers/typed-headers/consumer/main.rs @@ -19,24 +19,23 @@ use anyhow::Result; use iggy::prelude::*; use iggy_examples::shared::args::Args; -use iggy_examples::shared::messages::*; use iggy_examples::shared::system; use std::error::Error; use std::sync::Arc; -use tracing::{info, warn}; +use tracing::info; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, Registry}; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { - let args = Args::parse_with_defaults("message-headers-consumer"); + let args = Args::parse_with_defaults("typed-headers-consumer"); Registry::default() .with(tracing_subscriber::fmt::layer()) .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) .init(); info!( - "Message headers consumer has started, selected transport: {}", + "Typed headers consumer has started, selected transport: {}", args.transport ); let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?); @@ -48,32 +47,25 @@ async fn main() -> Result<(), Box<dyn Error>> { } fn handle_message(message: &IggyMessage) -> Result<(), Box<dyn Error>> { - // The payload can be of any type as it is a raw byte array. In this case it's a JSON string. let payload = std::str::from_utf8(&message.payload)?; - // The message type is stored in the custom message header. - let header_key = HeaderKey::new("message_type").unwrap(); - let headers_map = message.user_headers_map()?.unwrap(); - let message_type = headers_map.get(&header_key).unwrap().as_str()?; + info!( - "Handling message type: {} at offset: {}...", - message_type, message.header.offset + "Message at offset: {}, payload: {}", + message.header.offset, payload ); - match message_type { - ORDER_CREATED_TYPE => { - let order_created = serde_json::from_str::<OrderCreated>(payload)?; - info!("{:#?}", order_created); - } - ORDER_CONFIRMED_TYPE => { - let order_confirmed = serde_json::from_str::<OrderConfirmed>(payload)?; - info!("{:#?}", order_confirmed); - } - ORDER_REJECTED_TYPE => { - let order_rejected = serde_json::from_str::<OrderRejected>(payload)?; - info!("{:#?}", order_rejected); - } - _ => { - warn!("Received unknown message type: {}", message_type); + + if let Some(headers_map) = message.user_headers_map()? { + info!("Headers ({}):", headers_map.len()); + for (key, value) in &headers_map { + info!( + " key: [kind={}, value={}] -> value: [kind={}, value={}]", + key.kind, + key.to_string_value(), + value.kind, + value.to_string_value() + ); } } + Ok(()) } diff --git a/examples/rust/src/message-headers/message-type/producer/main.rs b/examples/rust/src/message-headers/typed-headers/producer/main.rs similarity index 68% copy from examples/rust/src/message-headers/message-type/producer/main.rs copy to examples/rust/src/message-headers/typed-headers/producer/main.rs index 4b762c32c..60d86687a 100644 --- a/examples/rust/src/message-headers/message-type/producer/main.rs +++ b/examples/rust/src/message-headers/typed-headers/producer/main.rs @@ -20,11 +20,9 @@ use anyhow::Result; use bytes::Bytes; use iggy::prelude::*; use iggy_examples::shared::args::Args; -use iggy_examples::shared::messages_generator::MessagesGenerator; use iggy_examples::shared::system; use std::collections::HashMap; use std::error::Error; -use std::str::FromStr; use std::sync::Arc; use tracing::info; use tracing_subscriber::layer::SubscriberExt; @@ -33,13 +31,13 @@ use tracing_subscriber::{EnvFilter, Registry}; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { - let args = Args::parse_with_defaults("message-headers-producer"); + let args = Args::parse_with_defaults("typed-headers-producer"); Registry::default() .with(tracing_subscriber::fmt::layer()) .with(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) .init(); info!( - "Message headers producer has started, selected transport: {}", + "Typed headers producer has started, selected transport: {}", args.transport ); let client_provider_config = Arc::new(ClientProviderConfig::from_args(args.to_sdk_args())?); @@ -62,9 +60,10 @@ async fn produce_messages(args: &Args, client: &IggyClient) -> Result<(), Box<dy let stream_id = args.stream_id.clone().try_into()?; let topic_id = args.topic_id.clone().try_into()?; let mut interval = interval.map(|interval| tokio::time::interval(interval.get_duration())); - let mut message_generator = MessagesGenerator::new(); let mut sent_batches = 0; + let mut message_id: u64 = 0; let partitioning = Partitioning::partition_id(args.partition_id); + loop { if args.message_batches_limit > 0 && sent_batches == args.message_batches_limit { info!("Sent {sent_batches} batches of messages, exiting."); @@ -76,33 +75,52 @@ async fn produce_messages(args: &Args, client: &IggyClient) -> Result<(), Box<dy } let mut messages = Vec::new(); - let mut serializable_messages = Vec::new(); for _ in 0..args.messages_per_batch { - let serializable_message = message_generator.generate(); - // You can send the different message types to the same partition, or stick to the single type. - let message_type = serializable_message.get_message_type(); - let json = serializable_message.to_json(); + message_id += 1; - // The message type will be stored in the custom message header. let mut headers = HashMap::new(); headers.insert( - HeaderKey::new("message_type").unwrap(), - HeaderValue::from_str(message_type).unwrap(), + HeaderKey::from_string("event_type")?, + HeaderValue::from_string("user_action")?, + ); + headers.insert( + HeaderKey::from_uint32(1)?, + HeaderValue::from_uint64(message_id)?, + ); + headers.insert( + HeaderKey::from_string("important")?, + HeaderValue::from_bool(message_id.is_multiple_of(5))?, + ); + headers.insert( + HeaderKey::from_uint32(44)?, + HeaderValue::from_float64(message_id as f64 * 2.0)?, ); + headers.insert( + HeaderKey::from_string("trace_id")?, + HeaderValue::from_int128(message_id as i128 * 1_000_000_000_000)?, + ); + headers.insert( + HeaderKey::from_raw(&[0xDE, 0xAD])?, + HeaderValue::from_raw(&[0xBE, 0xEF, 0xCA, 0xFE])?, + ); + + let payload = + format!(r#"{{"message_id":{message_id},"content":"Hello from typed headers!"}}"#,); let message = IggyMessage::builder() - .payload(Bytes::from(json)) + .payload(Bytes::from(payload)) .user_headers(headers) - .build() - .unwrap(); + .build()?; messages.push(message); - // This is used for the logging purposes only. - serializable_messages.push(serializable_message); } + client .send_messages(&stream_id, &topic_id, &partitioning, &mut messages) .await?; sent_batches += 1; - info!("Sent messages: {:#?}", serializable_messages); + info!( + "Sent batch {} with {} messages (last message_id: {})", + sent_batches, args.messages_per_batch, message_id + ); } } diff --git a/examples/rust/src/shared/codec.rs b/examples/rust/src/shared/codec.rs index 30df429be..b5d5aa1dc 100644 --- a/examples/rust/src/shared/codec.rs +++ b/examples/rust/src/shared/codec.rs @@ -57,7 +57,7 @@ impl FromStr for Codec { impl Codec { /// Returns the key to indicate compressed messages as HeaderKey. pub fn header_key() -> HeaderKey { - HeaderKey::new(COMPRESSION_HEADER_KEY) + HeaderKey::from_string(COMPRESSION_HEADER_KEY) .expect("COMPRESSION_HEADER_KEY is an InvalidHeaderKey.") } diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml index 1b7a5c6dc..eb8230fdc 100644 --- a/foreign/python/Cargo.toml +++ b/foreign/python/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "apache-iggy" -version = "0.6.1-dev1" +version = "0.6.2-dev1" edition = "2021" authors = [ "Dario Lencina Talarico <[email protected]>", @@ -31,7 +31,7 @@ repository = "https://github.com/apache/iggy" [dependencies] bytes = "1.10.1" futures = "0.3.31" -iggy = { path = "../../core/sdk", version = "0.8.1-edge.1" } +iggy = { path = "../../core/sdk", version = "0.8.2-edge.1" } pyo3 = "0.26.0" pyo3-async-runtimes = { version = "0.26.0", features = [ "attributes", diff --git a/web/src/lib/components/Modals/InspectMessage.svelte b/web/src/lib/components/Modals/InspectMessage.svelte index 8c841d21e..1e58c4f2b 100644 --- a/web/src/lib/components/Modals/InspectMessage.svelte +++ b/web/src/lib/components/Modals/InspectMessage.svelte @@ -20,9 +20,9 @@ <script lang="ts"> import ModalBase from './ModalBase.svelte'; import type { CloseModalFn } from '$lib/types/utilTypes'; - import { type Message, type HeaderField } from '$lib/domain/Message'; + import { type Message, type HeaderEntry, type HeaderField } from '$lib/domain/Message'; import MessageDecoder from '$lib/components/MessageDecoder/MessageDecoder.svelte'; - import { decodeBase64 } from '$lib/utils/base64Utils'; + import { formatMessageId } from '$lib/utils/formatters/uuidFormatter'; interface Props { @@ -34,22 +34,85 @@ const formattedId = $derived(message?.id ? formatMessageId(message.id) : 'N/A'); - const formatHeaders = (headers: Record<string, HeaderField> | null | undefined) => { - if (!headers || Object.keys(headers).length === 0) { - return 'No headers'; - } + const decodeHeaderValue = (kind: string, base64Value: string): string => { try { - return JSON.stringify(headers, null, 2); + const binaryString = atob(base64Value); + const bytes = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + const view = new DataView(bytes.buffer); + + switch (kind.toLowerCase()) { + case 'string': + return new TextDecoder().decode(bytes); + case 'bool': + return bytes[0] !== 0 ? 'true' : 'false'; + case 'int8': + return view.getInt8(0).toString(); + case 'int16': + return view.getInt16(0, true).toString(); + case 'int32': + return view.getInt32(0, true).toString(); + case 'int64': + return view.getBigInt64(0, true).toString(); + case 'int128': { + const lo = view.getBigUint64(0, true); + const hi = view.getBigInt64(8, true); + return (hi * BigInt(2 ** 64) + lo).toString(); + } + case 'uint8': + return view.getUint8(0).toString(); + case 'uint16': + return view.getUint16(0, true).toString(); + case 'uint32': + return view.getUint32(0, true).toString(); + case 'uint64': + return view.getBigUint64(0, true).toString(); + case 'uint128': { + const lo = view.getBigUint64(0, true); + const hi = view.getBigUint64(8, true); + return (hi * BigInt(2 ** 64) + lo).toString(); + } + case 'float32': + return view.getFloat32(0, true).toString(); + case 'float64': + return view.getFloat64(0, true).toString(); + default: + return base64Value; + } } catch { - return 'Invalid headers format'; + return base64Value; + } + }; + + let expandedHeaders: Set<number> = $state(new Set()); + + const toggleHeaderExpand = (index: number) => { + if (expandedHeaders.has(index)) { + expandedHeaders.delete(index); + } else { + expandedHeaders.add(index); } + expandedHeaders = new Set(expandedHeaders); + }; + + const findHeaderByStringKey = ( + headers: HeaderEntry[] | undefined, + keyName: string + ): HeaderField | undefined => { + if (!headers) return undefined; + const entry = headers.find( + (e) => e.key.kind === 'string' && decodeHeaderValue('string', e.key.value) === keyName + ); + return entry?.value; }; - // TODO: whether all header values should be decoded? let codec = $derived( - message?.user_headers?.['codec']?.value - ? decodeBase64(message.user_headers['codec'].value) - : undefined + (() => { + const codecHeader = findHeaderByStringKey(message?.user_headers, 'codec'); + return codecHeader ? decodeHeaderValue(codecHeader.kind, codecHeader.value) : undefined; + })() ); </script> @@ -75,7 +138,16 @@ <div class="bg-shade-l200 dark:bg-shade-d400 p-3 lg:p-4 rounded-md"> <span class="text-xs text-shade-l900 dark:text-shade-l700 block mb-1">Checksum</span> - <div class="text-sm text-color font-medium">{message?.checksum ?? 'N/A'}</div> + <div class="text-sm text-color font-medium font-mono"> + {#if message?.checksum != null} + 0x{BigInt(message.checksum).toString(16).toUpperCase()} + <span class="text-xs text-shade-l900 dark:text-shade-l700 ml-2"> + ({message.checksum}) + </span> + {:else} + N/A + {/if} + </div> </div> <div class="bg-shade-l200 dark:bg-shade-d400 p-3 lg:p-4 rounded-md"> @@ -90,8 +162,38 @@ <div class="bg-shade-l200 dark:bg-shade-d400 p-3 lg:p-4 rounded-md"> <span class="text-xs text-shade-l900 dark:text-shade-l700 block mb-1">Headers</span> - <div class="text-sm text-color font-medium font-mono whitespace-pre-wrap"> - {formatHeaders(message?.user_headers)} + <div class="text-sm text-color font-medium font-mono max-h-48 overflow-y-auto"> + {#if !message?.user_headers || message.user_headers.length === 0} + <span class="text-shade-l900 dark:text-shade-l700">No headers</span> + {:else} + <div class="flex flex-col gap-1"> + {#each message.user_headers as entry, index} + {@const keyValue = decodeHeaderValue(entry.key.kind, entry.key.value)} + {@const valueValue = decodeHeaderValue(entry.value.kind, entry.value.value)} + {@const isExpanded = expandedHeaders.has(index)} + <div class="flex flex-col"> + <div class="flex items-center gap-1"> + <button + type="button" + onclick={() => toggleHeaderExpand(index)} + class="text-shade-l900 dark:text-shade-l700 hover:text-color transition-colors text-xs w-4" + title="Show type details" + > + {isExpanded ? '▼' : '▶'} + </button> + <span class="text-color">{keyValue}</span> + <span class="text-shade-l900 dark:text-shade-l700 mx-1">→</span> + <span class="text-color">{valueValue}</span> + </div> + {#if isExpanded} + <div class="ml-5 text-xs text-shade-l900 dark:text-shade-l700"> + key: {entry.key.kind}, value: {entry.value.kind} + </div> + {/if} + </div> + {/each} + </div> + {/if} </div> </div> </div> diff --git a/web/src/lib/components/RouteComponents/Settings/UsersTab.svelte b/web/src/lib/components/RouteComponents/Settings/UsersTab.svelte index 77b98d633..0f5a60c43 100644 --- a/web/src/lib/components/RouteComponents/Settings/UsersTab.svelte +++ b/web/src/lib/components/RouteComponents/Settings/UsersTab.svelte @@ -74,13 +74,13 @@ const { checked } = e.target as HTMLInputElement; $selectedUsersId = checked - ? users.filter((user) => user.id !== 1).map((user) => `${user.id}`) + ? users.filter((user) => user.id !== 0).map((user) => `${user.id}`) : []; }; let allChecked = $derived( users - .filter((user) => user.id !== 1) + .filter((user) => user.id !== 0) .every((user) => $selectedUsersId.includes(user.id.toString())) ); </script> @@ -127,13 +127,13 @@ for="{row.id}-{row.username}" class={twMerge( baseClass, - row.id === 1 && 'bg-shade-l800 dark:bg-shade-d1000 pointer-events-none', + row.id === 0 && 'bg-shade-l800 dark:bg-shade-d1000 pointer-events-none', $selectedUsersId.includes(row.id.toString()) && 'ring-2 ring-inset ring-green500 bg-green-300/30! ' )} > <div class="flex items-center justify-center"> - {#if row.id !== 1} + {#if row.id !== 0} <Checkbox value={row.id.toString()} bind:bindGroup={$selectedUsersId} diff --git a/web/src/lib/domain/Message.ts b/web/src/lib/domain/Message.ts index d4d775ef4..a09acf3e1 100644 --- a/web/src/lib/domain/Message.ts +++ b/web/src/lib/domain/Message.ts @@ -34,7 +34,7 @@ export type Message = { user_headers_length: number; payload_length: number; formattedTimestamp: string; - user_headers: Record<string, HeaderField>; + user_headers: HeaderEntry[]; payload: string; truncatedPayload: string; }; @@ -44,6 +44,11 @@ export type HeaderField = { value: string; }; +export type HeaderEntry = { + key: HeaderField; + value: HeaderField; +}; + export function messageMapper(item: any): Message { const payload = item.payload; const truncatedPayload = payload.length > 30 ? `${payload.slice(0, 30)} [...]` : payload; diff --git a/web/src/lib/domain/User.ts b/web/src/lib/domain/User.ts index db4a248df..50e16fb48 100644 --- a/web/src/lib/domain/User.ts +++ b/web/src/lib/domain/User.ts @@ -31,6 +31,6 @@ export function userMapper(item: any): User { id: item.id, createdAt: formatDate(item.created_at), status: item.status, - username: `${item.username} ${item.id === 1 ? '(root)' : ''}` + username: `${item.username} ${item.id === 0 ? '(root)' : ''}` }; } diff --git a/web/src/routes/dashboard/settings/users/+page.svelte b/web/src/routes/dashboard/settings/users/+page.svelte index 3090690d3..d57db86c2 100644 --- a/web/src/routes/dashboard/settings/users/+page.svelte +++ b/web/src/routes/dashboard/settings/users/+page.svelte @@ -76,13 +76,13 @@ const { checked } = e.target as HTMLInputElement; $selectedUsersId = checked - ? data.users.filter((user) => user.id !== 1).map((user) => `${user.id}`) + ? data.users.filter((user) => user.id !== 0).map((user) => `${user.id}`) : []; }; let allChecked = $derived( data.users - .filter((user) => user.id !== 1) + .filter((user) => user.id !== 0) .every((user) => $selectedUsersId.includes(user.id.toString())) ); </script> @@ -173,13 +173,13 @@ for="{row.id}-{row.username}" class={twMerge( baseClass, - row.id === 1 && 'bg-shade-l800 dark:bg-shade-d1000 pointer-events-none', + row.id === 0 && 'bg-shade-l800 dark:bg-shade-d1000 pointer-events-none', $selectedUsersId.includes(row.id.toString()) && 'ring-2 ring-inset ring-green500 bg-green-300/30! ' )} > <div class="flex items-center justify-center"> - {#if row.id !== 1} + {#if row.id !== 0} <Checkbox value={row.id.toString()} bind:bindGroup={$selectedUsersId} @@ -213,7 +213,7 @@ </span> </div> <div class="px-5"> - {#if row.id !== 1} + {#if row.id !== 0} <StopPropagation> <DropdownMenu placement="left-start"> {#snippet trigger()}
