This is an automated email from the ASF dual-hosted git repository. piotr pushed a commit to branch iggy_header_reserved in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 9f0621c3a31a70fd1d786d585f79a75dc89a6699 Author: spetz <[email protected]> AuthorDate: Sun Feb 1 08:53:17 2026 +0100 feat(server,sdk): add reserved u64 to iggy message header --- Cargo.lock | 34 ++++----- Cargo.toml | 8 +-- DEPENDENCIES.md | 34 ++++----- core/ai/mcp/Cargo.toml | 2 +- core/bench/Cargo.toml | 2 +- core/binary_protocol/Cargo.toml | 2 +- core/cli/Cargo.toml | 2 +- core/common/Cargo.toml | 2 +- core/common/src/types/message/iggy_message.rs | 1 + core/common/src/types/message/message_header.rs | 82 +++++++++++++++++++++- .../src/types/message/message_header_view.rs | 8 +++ core/connectors/runtime/Cargo.toml | 2 +- core/connectors/sdk/Cargo.toml | 2 +- .../connectors/sinks/elasticsearch_sink/Cargo.toml | 2 +- core/connectors/sinks/iceberg_sink/Cargo.toml | 2 +- core/connectors/sinks/postgres_sink/Cargo.toml | 2 +- core/connectors/sinks/quickwit_sink/Cargo.toml | 2 +- core/connectors/sinks/stdout_sink/Cargo.toml | 2 +- .../sources/elasticsearch_source/Cargo.toml | 2 +- core/connectors/sources/postgres_source/Cargo.toml | 2 +- core/connectors/sources/random_source/Cargo.toml | 2 +- core/sdk/Cargo.toml | 2 +- core/server/Cargo.toml | 2 +- .../csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs | 7 +- foreign/csharp/Iggy_SDK/Iggy_SDK.csproj | 2 +- foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs | 14 ++-- foreign/csharp/Iggy_SDK/Messages/Message.cs | 4 +- foreign/csharp/Iggy_SDK/Messages/MessageHeader.cs | 5 ++ .../Iggy_SDK/Utils/TcpMessageStreamHelpers.cs | 2 +- .../csharp/Iggy_SDK_Tests/Utils/BinaryFactory.cs | 5 +- foreign/go/contracts/message_header.go | 8 ++- foreign/java/gradle.properties | 2 +- .../main/java/org/apache/iggy/message/Message.java | 6 +- .../org/apache/iggy/message/MessageHeader.java | 6 +- .../org/apache/iggy/serde/BytesDeserializer.java | 5 +- .../org/apache/iggy/serde/BytesSerializer.java | 1 + .../client/blocking/tcp/BytesSerializerTest.java | 9 ++- .../apache/iggy/serde/BytesDeserializerTest.java | 9 ++- foreign/node/package-lock.json | 4 +- foreign/node/package.json | 2 +- foreign/node/src/wire/message/iggy-header.utils.ts | 35 ++++----- .../src/wire/message/send-messages.command.test.ts | 2 +- foreign/python/Cargo.toml | 4 +- 43 files changed, 225 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b43d62fe0..24850ca59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4423,7 +4423,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.8.2-edge.2" +version = "0.8.3-edge.1" dependencies = [ "async-broadcast", "async-dropper", @@ -4455,7 +4455,7 @@ dependencies = [ [[package]] name = "iggy-bench" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" dependencies = [ "async-trait", "bench-report", @@ -4510,7 +4510,7 @@ dependencies = [ [[package]] name = "iggy-cli" -version = "0.10.2-edge.1" +version = "0.10.3-edge.1" dependencies = [ "ahash 0.8.12", "anyhow", @@ -4530,7 +4530,7 @@ dependencies = [ [[package]] name = "iggy-connectors" -version = "0.2.2-edge.1" +version = "0.2.3-edge.1" dependencies = [ "async-trait", "axum", @@ -4582,7 +4582,7 @@ dependencies = [ [[package]] name = "iggy-mcp" -version = "0.2.2-edge.1" +version = "0.2.3-edge.1" dependencies = [ "axum", "axum-server", @@ -4614,7 +4614,7 @@ dependencies = [ [[package]] name = "iggy_binary_protocol" -version = "0.8.2-edge.2" +version = "0.8.3-edge.1" dependencies = [ "anyhow", "async-broadcast", @@ -4635,7 +4635,7 @@ dependencies = [ [[package]] name = "iggy_common" -version = "0.8.2-edge.2" +version = "0.8.3-edge.1" dependencies = [ "aes-gcm", "ahash 0.8.12", @@ -4676,7 +4676,7 @@ dependencies = [ [[package]] name = "iggy_connector_elasticsearch_sink" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" dependencies = [ "async-trait", "base64 0.22.1", @@ -4694,7 +4694,7 @@ dependencies = [ [[package]] name = "iggy_connector_elasticsearch_source" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" dependencies = [ "async-trait", "chrono", @@ -4712,7 +4712,7 @@ dependencies = [ [[package]] name = "iggy_connector_iceberg_sink" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" dependencies = [ "arrow-json", "async-trait", @@ -4731,7 +4731,7 @@ dependencies = [ [[package]] name = "iggy_connector_postgres_sink" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" dependencies = [ "async-trait", "chrono", @@ -4750,7 +4750,7 @@ dependencies = [ [[package]] name = "iggy_connector_postgres_source" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" dependencies = [ "async-trait", "base64 0.22.1", @@ -4771,7 +4771,7 @@ dependencies = [ [[package]] name = "iggy_connector_quickwit_sink" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" dependencies = [ "async-trait", "dashmap", @@ -4786,7 +4786,7 @@ dependencies = [ [[package]] name = "iggy_connector_random_source" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" dependencies = [ "async-trait", "dashmap", @@ -4803,7 +4803,7 @@ dependencies = [ [[package]] name = "iggy_connector_sdk" -version = "0.1.2-edge.1" +version = "0.1.3-edge.1" dependencies = [ "async-trait", "base64 0.22.1", @@ -4831,7 +4831,7 @@ dependencies = [ [[package]] name = "iggy_connector_stdout_sink" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" dependencies = [ "async-trait", "dashmap", @@ -8243,7 +8243,7 @@ dependencies = [ [[package]] name = "server" -version = "0.6.2-edge.1" +version = "0.6.3-edge.1" dependencies = [ "ahash 0.8.12", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 0be7d8a9c..ef364eb18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -160,10 +160,10 @@ humantime = "2.3.0" hwlocality = "1.0.0-alpha.11" iceberg = "0.8.0" iceberg-catalog-rest = "0.8.0" -iggy = { path = "core/sdk", version = "0.8.2-edge.2" } -iggy_binary_protocol = { path = "core/binary_protocol", version = "0.8.2-edge.2" } -iggy_common = { path = "core/common", version = "0.8.2-edge.2" } -iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.2-edge.1" } +iggy = { path = "core/sdk", version = "0.8.3-edge.1" } +iggy_binary_protocol = { path = "core/binary_protocol", version = "0.8.3-edge.1" } +iggy_common = { path = "core/common", version = "0.8.3-edge.1" } +iggy_connector_sdk = { path = "core/connectors/sdk", version = "0.1.3-edge.1" } integration = { path = "core/integration" } journal = { path = "core/journal" } js-sys = "0.3" diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index a7fd8c256..e77b15245 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -384,23 +384,23 @@ icu_provider: 2.1.1, "Unicode-3.0", ident_case: 1.0.1, "Apache-2.0 OR MIT", idna: 1.1.0, "Apache-2.0 OR MIT", idna_adapter: 1.2.1, "Apache-2.0 OR MIT", -iggy: 0.8.2-edge.2, "Apache-2.0", -iggy-bench: 0.3.2-edge.1, "Apache-2.0", +iggy: 0.8.3-edge.1, "Apache-2.0", +iggy-bench: 0.3.3-edge.1, "Apache-2.0", iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0", -iggy-cli: 0.10.2-edge.1, "Apache-2.0", -iggy-connectors: 0.2.2-edge.1, "Apache-2.0", -iggy-mcp: 0.2.2-edge.1, "Apache-2.0", -iggy_binary_protocol: 0.8.2-edge.2, "Apache-2.0", -iggy_common: 0.8.2-edge.2, "Apache-2.0", -iggy_connector_elasticsearch_sink: 0.2.1-edge.1, "Apache-2.0", -iggy_connector_elasticsearch_source: 0.2.1-edge.1, "Apache-2.0", -iggy_connector_iceberg_sink: 0.2.1-edge.1, "Apache-2.0", -iggy_connector_postgres_sink: 0.2.1-edge.1, "Apache-2.0", -iggy_connector_postgres_source: 0.2.1-edge.1, "Apache-2.0", -iggy_connector_quickwit_sink: 0.2.1-edge.1, "Apache-2.0", -iggy_connector_random_source: 0.2.1-edge.1, "Apache-2.0", -iggy_connector_sdk: 0.1.2-edge.1, "Apache-2.0", -iggy_connector_stdout_sink: 0.2.1-edge.1, "Apache-2.0", +iggy-cli: 0.10.3-edge.1, "Apache-2.0", +iggy-connectors: 0.2.3-edge.1, "Apache-2.0", +iggy-mcp: 0.2.3-edge.1, "Apache-2.0", +iggy_binary_protocol: 0.8.3-edge.1, "Apache-2.0", +iggy_common: 0.8.3-edge.1, "Apache-2.0", +iggy_connector_elasticsearch_sink: 0.2.2-edge.1, "Apache-2.0", +iggy_connector_elasticsearch_source: 0.2.2-edge.1, "Apache-2.0", +iggy_connector_iceberg_sink: 0.2.2-edge.1, "Apache-2.0", +iggy_connector_postgres_sink: 0.2.2-edge.1, "Apache-2.0", +iggy_connector_postgres_source: 0.2.2-edge.1, "Apache-2.0", +iggy_connector_quickwit_sink: 0.2.2-edge.1, "Apache-2.0", +iggy_connector_random_source: 0.2.2-edge.1, "Apache-2.0", +iggy_connector_sdk: 0.1.3-edge.1, "Apache-2.0", +iggy_connector_stdout_sink: 0.2.2-edge.1, "Apache-2.0", iggy_examples: 0.0.6, "Apache-2.0", ignore: 0.4.25, "MIT OR Unlicense", impl-more: 0.1.9, "Apache-2.0 OR MIT", @@ -717,7 +717,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT", serde_yaml_ng: 0.10.0, "MIT", serial_test: 3.3.1, "MIT", serial_test_derive: 3.3.1, "MIT", -server: 0.6.2-edge.1, "Apache-2.0", +server: 0.6.3-edge.1, "Apache-2.0", sha1: 0.10.6, "Apache-2.0 OR MIT", sha2: 0.10.9, "Apache-2.0 OR MIT", sha3: 0.10.8, "Apache-2.0 OR MIT", diff --git a/core/ai/mcp/Cargo.toml b/core/ai/mcp/Cargo.toml index ac05978a5..9dcfc45cb 100644 --- a/core/ai/mcp/Cargo.toml +++ b/core/ai/mcp/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy-mcp" -version = "0.2.2-edge.1" +version = "0.2.3-edge.1" description = "MCP Server for Iggy message streaming platform" edition = "2024" license = "Apache-2.0" diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml index 459ed102f..294a47dd7 100644 --- a/core/bench/Cargo.toml +++ b/core/bench/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy-bench" -version = "0.3.2-edge.1" +version = "0.3.3-edge.1" edition = "2024" license = "Apache-2.0" repository = "https://github.com/apache/iggy" diff --git a/core/binary_protocol/Cargo.toml b/core/binary_protocol/Cargo.toml index a5553ff51..be1171ce3 100644 --- a/core/binary_protocol/Cargo.toml +++ b/core/binary_protocol/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_binary_protocol" -version = "0.8.2-edge.2" +version = "0.8.3-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/cli/Cargo.toml b/core/cli/Cargo.toml index 7a669b8fc..71232bc5a 100644 --- a/core/cli/Cargo.toml +++ b/core/cli/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy-cli" -version = "0.10.2-edge.1" +version = "0.10.3-edge.1" edition = "2024" authors = ["[email protected]"] repository = "https://github.com/apache/iggy" diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index 74f1b27a0..783a6aa23 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -16,7 +16,7 @@ # under the License. [package] name = "iggy_common" -version = "0.8.2-edge.2" +version = "0.8.3-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/common/src/types/message/iggy_message.rs b/core/common/src/types/message/iggy_message.rs index 49fe0b89b..57a95b648 100644 --- a/core/common/src/types/message/iggy_message.rs +++ b/core/common/src/types/message/iggy_message.rs @@ -190,6 +190,7 @@ impl IggyMessage { origin_timestamp: IggyTimestamp::now().as_micros(), user_headers_length, payload_length: payload.len() as u32, + _reserved: 0, }; let user_headers = user_headers.map(|h| h.to_bytes()); diff --git a/core/common/src/types/message/message_header.rs b/core/common/src/types/message/message_header.rs index 120f779e6..f0cffdeda 100644 --- a/core/common/src/types/message/message_header.rs +++ b/core/common/src/types/message/message_header.rs @@ -21,7 +21,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use std::ops::Range; -pub const IGGY_MESSAGE_HEADER_SIZE: usize = 8 + 16 + 8 + 8 + 8 + 4 + 4; +pub const IGGY_MESSAGE_HEADER_SIZE: usize = 8 + 16 + 8 + 8 + 8 + 4 + 4 + 8; pub const IGGY_MESSAGE_HEADER_RANGE: Range<usize> = 0..IGGY_MESSAGE_HEADER_SIZE; pub const IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE: Range<usize> = 0..8; @@ -31,6 +31,7 @@ pub const IGGY_MESSAGE_TIMESTAMP_OFFSET_RANGE: Range<usize> = 32..40; pub const IGGY_MESSAGE_ORIGIN_TIMESTAMP_OFFSET_RANGE: Range<usize> = 40..48; pub const IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE: Range<usize> = 48..52; pub const IGGY_MESSAGE_PAYLOAD_LENGTH_OFFSET_RANGE: Range<usize> = 52..56; +pub const IGGY_MESSAGE_RESERVED_OFFSET_RANGE: Range<usize> = 56..64; #[derive(Debug, Serialize, Deserialize, PartialEq, Default)] pub struct IggyMessageHeader { @@ -41,6 +42,8 @@ pub struct IggyMessageHeader { pub origin_timestamp: u64, pub user_headers_length: u32, pub payload_length: u32, + // Reserved for future use + pub _reserved: u64, } impl Sizeable for IggyMessageHeader { @@ -91,6 +94,11 @@ impl IggyMessageHeader { .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ), + _reserved: u64::from_le_bytes( + bytes[IGGY_MESSAGE_RESERVED_OFFSET_RANGE] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ), }) } } @@ -105,6 +113,7 @@ impl BytesSerializable for IggyMessageHeader { bytes.put_u64_le(self.origin_timestamp); bytes.put_u32_le(self.user_headers_length); bytes.put_u32_le(self.payload_length); + bytes.put_u64_le(self._reserved); bytes.freeze() } @@ -155,6 +164,12 @@ impl BytesSerializable for IggyMessageHeader { .map_err(|_| IggyError::InvalidNumberEncoding)?, ); + let _reserved = u64::from_le_bytes( + bytes[IGGY_MESSAGE_RESERVED_OFFSET_RANGE] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + Ok(IggyMessageHeader { checksum, id, @@ -163,6 +178,71 @@ impl BytesSerializable for IggyMessageHeader { origin_timestamp, user_headers_length: headers_length, payload_length, + _reserved, }) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_serialize_and_deserialize_header() { + let header = IggyMessageHeader { + checksum: 123456789, + id: 987654321, + offset: 100, + timestamp: 1000000, + origin_timestamp: 999999, + user_headers_length: 50, + payload_length: 200, + _reserved: 0, + }; + + let bytes = header.to_bytes(); + assert_eq!(bytes.len(), IGGY_MESSAGE_HEADER_SIZE); + + let deserialized = IggyMessageHeader::from_bytes(bytes).unwrap(); + assert_eq!(header, deserialized); + } + + #[test] + fn should_serialize_header_to_correct_size() { + let header = IggyMessageHeader::default(); + let bytes = header.to_bytes(); + assert_eq!(bytes.len(), IGGY_MESSAGE_HEADER_SIZE); + assert_eq!(bytes.len(), 64); + } + + #[test] + fn should_fail_to_deserialize_invalid_size() { + let bytes = Bytes::from(vec![0u8; 56]); + let result = IggyMessageHeader::from_bytes(bytes); + assert!(result.is_err()); + } + + #[test] + fn should_deserialize_from_raw_bytes() { + let header = IggyMessageHeader { + checksum: 111, + id: 222, + offset: 333, + timestamp: 444, + origin_timestamp: 555, + user_headers_length: 66, + payload_length: 77, + _reserved: 0, + }; + + let bytes = header.to_bytes(); + let deserialized = IggyMessageHeader::from_raw_bytes(&bytes).unwrap(); + assert_eq!(header.checksum, deserialized.checksum); + assert_eq!(header.id, deserialized.id); + assert_eq!(header.offset, deserialized.offset); + assert_eq!(header.timestamp, deserialized.timestamp); + assert_eq!(header.origin_timestamp, deserialized.origin_timestamp); + assert_eq!(header.user_headers_length, deserialized.user_headers_length); + assert_eq!(header.payload_length, deserialized.payload_length); + } +} diff --git a/core/common/src/types/message/message_header_view.rs b/core/common/src/types/message/message_header_view.rs index 94a8a690a..263a3d179 100644 --- a/core/common/src/types/message/message_header_view.rs +++ b/core/common/src/types/message/message_header_view.rs @@ -24,6 +24,7 @@ use crate::{ IGGY_MESSAGE_OFFSET_OFFSET_RANGE, IGGY_MESSAGE_ORIGIN_TIMESTAMP_OFFSET_RANGE, IGGY_MESSAGE_PAYLOAD_LENGTH_OFFSET_RANGE, IGGY_MESSAGE_TIMESTAMP_OFFSET_RANGE, IggyMessageHeader, error::IggyError, + types::message::message_header::IGGY_MESSAGE_RESERVED_OFFSET_RANGE, }; /// A read-only, typed view into a message header in a raw buffer. @@ -87,6 +88,12 @@ impl<'a> IggyMessageHeaderView<'a> { u32::from_le_bytes(bytes.try_into().unwrap()) as usize } + /// The reserved field (8 bytes) + pub fn _reserved(&self) -> u64 { + let bytes = &self.data[IGGY_MESSAGE_RESERVED_OFFSET_RANGE]; + u64::from_le_bytes(bytes.try_into().unwrap()) + } + /// Convert this view to a full IggyMessageHeader struct pub fn to_header(&self) -> IggyMessageHeader { IggyMessageHeader { @@ -97,6 +104,7 @@ impl<'a> IggyMessageHeaderView<'a> { origin_timestamp: self.origin_timestamp(), user_headers_length: self.user_headers_length() as u32, payload_length: self.payload_length() as u32, + _reserved: self._reserved(), } } } diff --git a/core/connectors/runtime/Cargo.toml b/core/connectors/runtime/Cargo.toml index 270471abe..41e7a3808 100644 --- a/core/connectors/runtime/Cargo.toml +++ b/core/connectors/runtime/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy-connectors" -version = "0.2.2-edge.1" +version = "0.2.3-edge.1" description = "Connectors runtime for Iggy message streaming platform" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sdk/Cargo.toml b/core/connectors/sdk/Cargo.toml index 4d03859c4..b42909386 100644 --- a/core/connectors/sdk/Cargo.toml +++ b/core/connectors/sdk/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_sdk" -version = "0.1.2-edge.1" +version = "0.1.3-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/elasticsearch_sink/Cargo.toml b/core/connectors/sinks/elasticsearch_sink/Cargo.toml index 7ede4457a..6201be03d 100644 --- a/core/connectors/sinks/elasticsearch_sink/Cargo.toml +++ b/core/connectors/sinks/elasticsearch_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_elasticsearch_sink" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" description = "Iggy Elasticsearch sink connector" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/iceberg_sink/Cargo.toml b/core/connectors/sinks/iceberg_sink/Cargo.toml index 4b63979de..d175ac70d 100644 --- a/core/connectors/sinks/iceberg_sink/Cargo.toml +++ b/core/connectors/sinks/iceberg_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_iceberg_sink" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" edition = "2024" license = "Apache-2.0" keywords = ["iggy", "messaging", "streaming"] diff --git a/core/connectors/sinks/postgres_sink/Cargo.toml b/core/connectors/sinks/postgres_sink/Cargo.toml index bfc5ead2c..a24a68cf3 100644 --- a/core/connectors/sinks/postgres_sink/Cargo.toml +++ b/core/connectors/sinks/postgres_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_postgres_sink" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" description = "Iggy PostgreSQL sink connector for storing stream messages into PostgreSQL database" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/quickwit_sink/Cargo.toml b/core/connectors/sinks/quickwit_sink/Cargo.toml index 51fa78f10..360d9f919 100644 --- a/core/connectors/sinks/quickwit_sink/Cargo.toml +++ b/core/connectors/sinks/quickwit_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_quickwit_sink" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sinks/stdout_sink/Cargo.toml b/core/connectors/sinks/stdout_sink/Cargo.toml index bdceb6297..56a36028b 100644 --- a/core/connectors/sinks/stdout_sink/Cargo.toml +++ b/core/connectors/sinks/stdout_sink/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_stdout_sink" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sources/elasticsearch_source/Cargo.toml b/core/connectors/sources/elasticsearch_source/Cargo.toml index 206a9ee67..6b19b6bc4 100644 --- a/core/connectors/sources/elasticsearch_source/Cargo.toml +++ b/core/connectors/sources/elasticsearch_source/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_elasticsearch_source" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" description = "Iggy Elasticsearch source connector" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sources/postgres_source/Cargo.toml b/core/connectors/sources/postgres_source/Cargo.toml index 4c5ed6bcc..0ce9b4968 100644 --- a/core/connectors/sources/postgres_source/Cargo.toml +++ b/core/connectors/sources/postgres_source/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_postgres_source" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" description = "Iggy PostgreSQL source connector supporting CDC and table polling for message streaming platform" edition = "2024" license = "Apache-2.0" diff --git a/core/connectors/sources/random_source/Cargo.toml b/core/connectors/sources/random_source/Cargo.toml index 11b4d4a6e..092c31cbb 100644 --- a/core/connectors/sources/random_source/Cargo.toml +++ b/core/connectors/sources/random_source/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy_connector_random_source" -version = "0.2.1-edge.1" +version = "0.2.2-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index 5479fdf7a..deb6f4568 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy" -version = "0.8.2-edge.2" +version = "0.8.3-edge.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 6789e6cb0..d0740fb5c 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "server" -version = "0.6.2-edge.1" +version = "0.6.3-edge.1" edition = "2024" license = "Apache-2.0" diff --git a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs index f2753e51c..1a0da05f2 100644 --- a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs +++ b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs @@ -420,17 +420,18 @@ internal static class TcpContracts message.Header.OriginTimestamp); BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 48)..(position + 52)], headersBytes.Length); BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 52)..(position + 56)], message.Payload.Length); + BinaryPrimitives.WriteUInt64LittleEndian(bytes[(position + 56)..(position + 64)], message.Header.Reserved); - message.Payload.CopyTo(bytes[(position + 56)..(position + 56 + message.Header.PayloadLength)]); + message.Payload.CopyTo(bytes[(position + 64)..(position + 64 + message.Header.PayloadLength)]); if (headersBytes.Length > 0) { headersBytes .CopyTo(bytes[ - (position + 56 + message.Header.PayloadLength)..(position + 56 + message.Header.PayloadLength + + (position + 64 + message.Header.PayloadLength)..(position + 64 + message.Header.PayloadLength + headersBytes.Length)]); } - position += 56 + message.Header.PayloadLength + headersBytes.Length; + position += 64 + message.Header.PayloadLength + headersBytes.Length; msgSize += message.GetSize() + headersBytes.Length; diff --git a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj index cf7adcb5f..634855ae1 100644 --- a/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj +++ b/foreign/csharp/Iggy_SDK/Iggy_SDK.csproj @@ -7,7 +7,7 @@ <TargetFrameworks>net8.0;net10.0</TargetFrameworks> <AssemblyName>Apache.Iggy</AssemblyName> <RootNamespace>Apache.Iggy</RootNamespace> - <PackageVersion>0.6.2-edge.1</PackageVersion> + <PackageVersion>0.6.3-edge.1</PackageVersion> <GenerateDocumentationFile>true</GenerateDocumentationFile> </PropertyGroup> diff --git a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs index 63b8376a9..576204e4e 100644 --- a/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs +++ b/foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs @@ -29,7 +29,7 @@ namespace Apache.Iggy.Mappers; internal static class BinaryMapper { - private const int PropertiesSize = 56; + private const int PropertiesSize = 64; internal static RawPersonalAccessToken MapRawPersonalAccessToken(ReadOnlySpan<byte> payload) { @@ -359,17 +359,18 @@ internal static class BinaryMapper var originTimestamp = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 40)..(position + 48)]); var headersLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 48)..(position + 52)]); var payloadLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 52)..(position + 56)]); + var reserved = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 56)..(position + 64)]); Dictionary<HeaderKey, HeaderValue>? headers = headersLength switch { 0 => null, > 0 => MapHeaders( - payload[(position + 56 + payloadLength)..(position + 56 + payloadLength + headersLength)]), + payload[(position + 64 + payloadLength)..(position + 64 + payloadLength + headersLength)]), < 0 => throw new ArgumentOutOfRangeException() }; - var payloadRangeStart = position + 56; - var payloadRangeEnd = position + 56 + payloadLength; + var payloadRangeStart = position + 64; + var payloadRangeEnd = position + 64 + payloadLength; if (payloadRangeStart > length || payloadRangeEnd > length) { break; @@ -393,7 +394,8 @@ internal static class BinaryMapper OriginTimestamp = originTimestamp, PayloadLength = payloadLength, Timestamp = DateTimeOffsetUtils.FromUnixTimeMicroSeconds(timestamp), - UserHeadersLength = headersLength + UserHeadersLength = headersLength, + Reserved = reserved }, UserHeaders = headers, Payload = decryptor is not null @@ -406,7 +408,7 @@ internal static class BinaryMapper ArrayPool<byte>.Shared.Return(messagePayload); } - position += 56 + payloadLength + headersLength; + position += 64 + payloadLength + headersLength; if (position + PropertiesSize >= length) { break; diff --git a/foreign/csharp/Iggy_SDK/Messages/Message.cs b/foreign/csharp/Iggy_SDK/Messages/Message.cs index 3ae23d7e3..1a0e7e697 100644 --- a/foreign/csharp/Iggy_SDK/Messages/Message.cs +++ b/foreign/csharp/Iggy_SDK/Messages/Message.cs @@ -96,8 +96,8 @@ public class Message /// <returns></returns> public int GetSize() { - //return 56 + Payload.Length + (UserHeaders?.Count ?? 0); - return 56 + Payload.Length; + //return 64 + Payload.Length + (UserHeaders?.Count ?? 0); + return 64 + Payload.Length; } private ulong CalculateChecksum(byte[] bytes) diff --git a/foreign/csharp/Iggy_SDK/Messages/MessageHeader.cs b/foreign/csharp/Iggy_SDK/Messages/MessageHeader.cs index 32ed5e1cf..d20e288d8 100644 --- a/foreign/csharp/Iggy_SDK/Messages/MessageHeader.cs +++ b/foreign/csharp/Iggy_SDK/Messages/MessageHeader.cs @@ -60,4 +60,9 @@ public class MessageHeader /// Length of the payload. /// </summary> public int PayloadLength { get; set; } + + /// <summary> + /// Reserved for future use. + /// </summary> + public ulong Reserved { get; set; } } diff --git a/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs b/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs index 22445d37d..c99337b4d 100644 --- a/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs +++ b/foreign/csharp/Iggy_SDK/Utils/TcpMessageStreamHelpers.cs @@ -48,7 +48,7 @@ internal static class TcpMessageStreamHelpers var bytesCount = 0; foreach (var message in messages) { - bytesCount += 16 + 56 + message.Payload.Length; + bytesCount += 16 + 64 + message.Payload.Length; if (message.UserHeaders is null) { continue; diff --git a/foreign/csharp/Iggy_SDK_Tests/Utils/BinaryFactory.cs b/foreign/csharp/Iggy_SDK_Tests/Utils/BinaryFactory.cs index daa787552..78e5c595e 100644 --- a/foreign/csharp/Iggy_SDK_Tests/Utils/BinaryFactory.cs +++ b/foreign/csharp/Iggy_SDK_Tests/Utils/BinaryFactory.cs @@ -46,7 +46,7 @@ internal sealed class BinaryFactory Guid guid, ReadOnlySpan<byte> payload) { var messageLength = payload.Length; - var totalSize = 56 + payload.Length; + var totalSize = 64 + payload.Length; Span<byte> payloadBuffer = new byte[totalSize].AsSpan(); BinaryPrimitives.WriteUInt64LittleEndian(payloadBuffer[..8], checkSum); @@ -56,8 +56,9 @@ internal sealed class BinaryFactory BinaryPrimitives.WriteUInt64LittleEndian(payloadBuffer[40..48], timestamp); BinaryPrimitives.WriteInt32LittleEndian(payloadBuffer[48..52], headersLength); BinaryPrimitives.WriteInt32LittleEndian(payloadBuffer[52..56], payload.Length); + BinaryPrimitives.WriteUInt64LittleEndian(payloadBuffer[56..64], 0); // reserved - payload.CopyTo(payloadBuffer[56..(56 + messageLength)]); + payload.CopyTo(payloadBuffer[64..(64 + messageLength)]); return payloadBuffer.ToArray(); } diff --git a/foreign/go/contracts/message_header.go b/foreign/go/contracts/message_header.go index 88afa0c99..7167f18d8 100644 --- a/foreign/go/contracts/message_header.go +++ b/foreign/go/contracts/message_header.go @@ -23,7 +23,7 @@ import ( "time" ) -const MessageHeaderSize = 8 + 16 + 8 + 8 + 8 + 4 + 4 +const MessageHeaderSize = 8 + 16 + 8 + 8 + 8 + 4 + 4 + 8 type MessageID [16]byte @@ -35,6 +35,7 @@ type MessageHeader struct { OriginTimestamp uint64 `json:"origin_timestamp"` UserHeaderLength uint32 `json:"user_header_length"` PayloadLength uint32 `json:"payload_length"` + Reserved uint64 `json:"reserved"` } func NewMessageHeader(id MessageID, payloadLength uint32, userHeaderLength uint32) MessageHeader { @@ -49,7 +50,7 @@ func NewMessageHeader(id MessageID, payloadLength uint32, userHeaderLength uint3 func MessageHeaderFromBytes(data []byte) (*MessageHeader, error) { if len(data) != MessageHeaderSize { - return nil, errors.New("data has incorrect size, must be 56") + return nil, errors.New("data has incorrect size, must be 64") } checksum := binary.LittleEndian.Uint64(data[0:8]) id := data[8:24] @@ -58,6 +59,7 @@ func MessageHeaderFromBytes(data []byte) (*MessageHeader, error) { originTimestamp := binary.LittleEndian.Uint64(data[40:48]) userHeaderLength := binary.LittleEndian.Uint32(data[48:52]) payloadLength := binary.LittleEndian.Uint32(data[52:56]) + reserved := binary.LittleEndian.Uint64(data[56:64]) return &MessageHeader{ Checksum: checksum, @@ -67,6 +69,7 @@ func MessageHeaderFromBytes(data []byte) (*MessageHeader, error) { OriginTimestamp: originTimestamp, UserHeaderLength: userHeaderLength, PayloadLength: payloadLength, + Reserved: reserved, }, nil } @@ -81,6 +84,7 @@ func (mh *MessageHeader) ToBytes() []byte { bytes = binary.LittleEndian.AppendUint64(bytes, mh.OriginTimestamp) bytes = binary.LittleEndian.AppendUint32(bytes, mh.UserHeaderLength) bytes = binary.LittleEndian.AppendUint32(bytes, mh.PayloadLength) + bytes = binary.LittleEndian.AppendUint64(bytes, mh.Reserved) return bytes } diff --git a/foreign/java/gradle.properties b/foreign/java/gradle.properties index 2942656f9..cbbe7abd4 100644 --- a/foreign/java/gradle.properties +++ b/foreign/java/gradle.properties @@ -17,5 +17,5 @@ # under the License. # -version=0.6.2-SNAPSHOT +version=0.6.3-SNAPSHOT group=org.apache.iggy diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java index 144103d10..d781105ab 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java @@ -68,7 +68,8 @@ public record Message( BigInteger.ZERO, BigInteger.ZERO, userHeadersLength, - (long) payloadBytes.length); + (long) payloadBytes.length, + BigInteger.ZERO); return new Message(msgHeader, payloadBytes, userHeaders); } @@ -82,7 +83,8 @@ public record Message( header.timestamp(), header.originTimestamp(), userHeadersLength, - (long) payload.length); + (long) payload.length, + header.reserved()); return new Message(updatedHeader, payload, mergedHeaders); } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/MessageHeader.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/MessageHeader.java index a0867988f..85f5868f1 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/MessageHeader.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/MessageHeader.java @@ -28,7 +28,7 @@ public record MessageHeader( BigInteger timestamp, BigInteger originTimestamp, Long userHeadersLength, - Long payloadLength) { - - public static final int SIZE = 8 + 16 + 8 + 8 + 8 + 4 + 4; + Long payloadLength, + BigInteger reserved) { + public static final int SIZE = 8 + 16 + 8 + 8 + 8 + 4 + 4 + 8; } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java index fcd16ca55..3fb1c843f 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java @@ -193,8 +193,9 @@ public final class BytesDeserializer { var originTimestamp = readU64AsBigInteger(response); var userHeadersLength = response.readUnsignedIntLE(); var payloadLength = response.readUnsignedIntLE(); - var header = - new MessageHeader(checksum, id, offset, timestamp, originTimestamp, userHeadersLength, payloadLength); + var reserved = readU64AsBigInteger(response); + var header = new MessageHeader( + checksum, id, offset, timestamp, originTimestamp, userHeadersLength, payloadLength, reserved); var payload = newByteArray(payloadLength); response.readBytes(payload); Map<HeaderKey, HeaderValue> userHeaders = new HashMap<>(); diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java index 20301ecdb..c9c317189 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java @@ -99,6 +99,7 @@ public final class BytesSerializer { buffer.writeBytes(toBytesAsU64(header.originTimestamp())); buffer.writeIntLE(header.userHeadersLength().intValue()); buffer.writeIntLE(header.payloadLength().intValue()); + buffer.writeBytes(toBytesAsU64(header.reserved())); return buffer; } diff --git a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java index 3453f1863..13893f040 100644 --- a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java +++ b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java @@ -505,7 +505,8 @@ class BytesSerializerTest { BigInteger.valueOf(1000), // timestamp BigInteger.valueOf(1000), // originTimestamp 0L, // userHeadersLength - 5L // payloadLength + 5L, // payloadLength + BigInteger.ZERO // reserved ); byte[] payload = "hello".getBytes(); var message = new Message(header, payload, new HashMap<>()); @@ -535,7 +536,8 @@ class BytesSerializerTest { BigInteger.valueOf(1000), BigInteger.valueOf(1000), (long) userHeadersLength, - 3L // "abc".length() + 3L, // "abc".length() + BigInteger.ZERO // reserved ); byte[] payload = "abc".getBytes(); var message = new Message(header, payload, userHeaders); @@ -562,7 +564,8 @@ class BytesSerializerTest { BigInteger.valueOf(2000), // timestamp BigInteger.valueOf(1999), // originTimestamp 10L, // userHeadersLength - 100L // payloadLength + 100L, // payloadLength + BigInteger.ZERO // reserved ); // when diff --git a/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java b/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java index 36a6d5ea3..c9b570ac0 100644 --- a/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java +++ b/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java @@ -341,6 +341,7 @@ class BytesDeserializerTest { writeU64(buffer, BigInteger.valueOf(1000)); // origin timestamp buffer.writeIntLE(0); // user headers length buffer.writeIntLE(5); // payload length + writeU64(buffer, BigInteger.ZERO); // reserved buffer.writeBytes("hello".getBytes()); // payload // when @@ -374,6 +375,7 @@ class BytesDeserializerTest { buffer.writeIntLE(headersBuffer.readableBytes()); // user headers length buffer.writeIntLE(3); // payload length + writeU64(buffer, BigInteger.ZERO); // reserved buffer.writeBytes("abc".getBytes()); // payload buffer.writeBytes(headersBuffer); // user headers @@ -401,6 +403,7 @@ class BytesDeserializerTest { writeU64(buffer, BigInteger.valueOf(1000)); buffer.writeIntLE(0); buffer.writeIntLE(2); + writeU64(buffer, BigInteger.ZERO); // reserved buffer.writeBytes("hi".getBytes()); // when @@ -775,7 +778,8 @@ class BytesDeserializerTest { "timestamp": 0, "origin_timestamp": 1000, "user_headers_length": 0, - "payload_length": 4 + "payload_length": 4, + "reserved": 0 }, "payload": "dGVzdA==", "user_headers": [] @@ -807,7 +811,8 @@ class BytesDeserializerTest { "timestamp": 0, "origin_timestamp": 1000, "user_headers_length": 62, - "payload_length": 4 + "payload_length": 4, + "reserved": 0 }, "payload": "dGVzdA==", "user_headers": [ diff --git a/foreign/node/package-lock.json b/foreign/node/package-lock.json index a8b396898..6ca584d62 100644 --- a/foreign/node/package-lock.json +++ b/foreign/node/package-lock.json @@ -1,12 +1,12 @@ { "name": "apache-iggy", - "version": "0.6.1-edge.1", + "version": "0.6.3-edge.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "apache-iggy", - "version": "0.6.1-edge.1", + "version": "0.6.3-edge.1", "license": "Apache-2.0", "dependencies": { "debug": "4.4.3", diff --git a/foreign/node/package.json b/foreign/node/package.json index e04752b5a..f339c4225 100644 --- a/foreign/node/package.json +++ b/foreign/node/package.json @@ -1,7 +1,7 @@ { "name": "apache-iggy", "type": "module", - "version": "0.6.2-edge.1", + "version": "0.6.3-edge.1", "description": "Official Apache Iggy NodeJS SDK", "keywords": [ "iggy", diff --git a/foreign/node/src/wire/message/iggy-header.utils.ts b/foreign/node/src/wire/message/iggy-header.utils.ts index d26ef4dec..b6bd0bf54 100644 --- a/foreign/node/src/wire/message/iggy-header.utils.ts +++ b/foreign/node/src/wire/message/iggy-header.utils.ts @@ -20,32 +20,33 @@ import { toDate } from "../serialize.utils.js"; import { u128LEBufToBigint } from "../number.utils.js"; - /** * Iggy message header containing metadata for each message. */ export type IggyMessageHeader = { /** Message checksum for integrity verification */ - checksum: bigint, + checksum: bigint; /** Unique message identifier (UUID or numeric) */ - id: string | bigint, + id: string | bigint; /** Message offset within the partition */ - offset: bigint, + offset: bigint; /** Server-assigned timestamp */ - timestamp: Date, + timestamp: Date; /** Client-provided origin timestamp */ - originTimestamp: Date, + originTimestamp: Date; /** Length of user-defined headers in bytes */ - userHeadersLength: number, + userHeadersLength: number; /** Length of message payload in bytes */ - payloadLength: number + payloadLength: number; + /** Reserved for future use */ + reserved: bigint; }; /** * Size of the Iggy message header in bytes. - * Layout: u64 (checksum) + u128 (id) + u64 (offset) + u64 (timestamp) + u64 (originTimestamp) + u32 (userHeadersLength) + u32 (payloadLength) + * Layout: u64 (checksum) + u128 (id) + u64 (offset) + u64 (timestamp) + u64 (originTimestamp) + u32 (userHeadersLength) + u32 (payloadLength) + u64 (reserved) */ -export const IGGY_MESSAGE_HEADER_SIZE = 8 + 16 + 8 + 8 + 8 + 4 + 4; +export const IGGY_MESSAGE_HEADER_SIZE = 8 + 16 + 8 + 8 + 8 + 4 + 4 + 8; /** * Serializes an Iggy message header to wire format. @@ -59,7 +60,7 @@ export const IGGY_MESSAGE_HEADER_SIZE = 8 + 16 + 8 + 8 + 8 + 4 + 4; export const serializeIggyMessageHeader = ( id: Buffer, payload: Buffer, - userHeaders: Buffer + userHeaders: Buffer, ) => { const b = Buffer.allocUnsafe(IGGY_MESSAGE_HEADER_SIZE); b.writeBigUInt64LE(0n, 0); // checksum u64 @@ -68,7 +69,8 @@ export const serializeIggyMessageHeader = ( b.writeBigUInt64LE(0n, 32); // timestamp u64 b.writeBigUint64LE(BigInt(new Date().getTime()), 40); // originTimestamp u64 b.writeUInt32LE(userHeaders.length, 48); // userHeaders len u32 - b.writeUInt32LE(payload.length, 52) // payload len u32 + b.writeUInt32LE(payload.length, 52); // payload len u32 + b.writeBigUInt64LE(0n, 56); // reserved u64 return b; }; @@ -88,10 +90,10 @@ export const deserialiseMessageId = (b: Buffer) => u128LEBufToBigint(b); * @throws Error if buffer length doesn't match expected header size */ export const deserializeIggyMessageHeaders = (b: Buffer) => { - if(b.length !== IGGY_MESSAGE_HEADER_SIZE) + if (b.length !== IGGY_MESSAGE_HEADER_SIZE) throw new Error( `deserialize message headers error, length = ${b.length} ` + - `expected ${IGGY_MESSAGE_HEADER_SIZE}` + `expected ${IGGY_MESSAGE_HEADER_SIZE}`, ); const headers: IggyMessageHeader = { checksum: b.readBigUInt64LE(0), @@ -100,7 +102,8 @@ export const deserializeIggyMessageHeaders = (b: Buffer) => { timestamp: toDate(b.readBigUInt64LE(32)), originTimestamp: toDate(b.readBigUInt64LE(40)), userHeadersLength: b.readUInt32LE(48), - payloadLength: b.readUInt32LE(52) - } + payloadLength: b.readUInt32LE(52), + reserved: b.readBigUInt64LE(56), + }; return headers; }; diff --git a/foreign/node/src/wire/message/send-messages.command.test.ts b/foreign/node/src/wire/message/send-messages.command.test.ts index 5538858fa..9caf2ce1e 100644 --- a/foreign/node/src/wire/message/send-messages.command.test.ts +++ b/foreign/node/src/wire/message/send-messages.command.test.ts @@ -40,7 +40,7 @@ describe("SendMessages", () => { }; it("serialize SendMessages into a buffer", () => { - assert.deepEqual(SEND_MESSAGES.serialize(t1).length, 533); + assert.deepEqual(SEND_MESSAGES.serialize(t1).length, 589); }); it("serialize all kinds of messageId", () => { diff --git a/foreign/python/Cargo.toml b/foreign/python/Cargo.toml index dc398e1d4..ee1094cda 100644 --- a/foreign/python/Cargo.toml +++ b/foreign/python/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "apache-iggy" -version = "0.6.2-dev1" +version = "0.6.3-dev1" edition = "2021" authors = [ "Dario Lencina Talarico <[email protected]>", @@ -31,7 +31,7 @@ repository = "https://github.com/apache/iggy" [dependencies] bytes = "1.11.0" futures = "0.3.31" -iggy = { path = "../../core/sdk", version = "0.8.2-edge.2" } +iggy = { path = "../../core/sdk", version = "0.8.3-edge.1" } pyo3 = "0.27.2" pyo3-async-runtimes = { version = "0.27.0", features = [ "attributes",
