This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 00ab99e4 finish send/poll messages
00ab99e4 is described below
commit 00ab99e410873ce008d58aa6128687d2a3f1a7be
Author: numinex <[email protected]>
AuthorDate: Sun Jun 29 10:01:36 2025 +0200
finish send/poll messages
---
.asf.yaml | 5 +
core/common/src/error/iggy_error.rs | 5 +
core/common/src/lib.rs | 1 -
core/common/src/types/confirmation/mod.rs | 53 ---------
core/common/src/types/mod.rs | 1 -
.../tests/streaming/get_by_timestamp.rs | 5 +-
core/integration/tests/streaming/topic_messages.rs | 5 +-
core/sdk/src/prelude.rs | 2 +-
.../handlers/messages/poll_messages_handler.rs | 128 +++++++++++++++++----
.../handlers/messages/send_messages_handler.rs | 111 +++++++++++++++---
core/server/src/configs/defaults.rs | 6 -
core/server/src/configs/displays.rs | 3 +-
core/server/src/configs/system.rs | 3 -
core/server/src/shard/mod.rs | 80 ++++++++++++-
core/server/src/shard/namespace.rs | 11 +-
core/server/src/shard/system/messages.rs | 52 ++-------
core/server/src/shard/system/partitions.rs | 15 ++-
core/server/src/shard/system/streams.rs | 9 ++
core/server/src/shard/system/topics.rs | 24 +++-
core/server/src/shard/transmission/event.rs | 60 ++++++++++
core/server/src/shard/transmission/frame.rs | 25 ++--
core/server/src/shard/transmission/message.rs | 28 ++++-
core/server/src/shard/transmission/mod.rs | 1 +
core/server/src/streaming/partitions/messages.rs | 20 +---
core/server/src/streaming/partitions/partition.rs | 8 +-
.../streaming/segments/messages/messages_reader.rs | 31 +++--
.../streaming/segments/messages/messages_writer.rs | 2 +-
core/server/src/streaming/streams/storage.rs | 14 +--
core/server/src/streaming/streams/topics.rs | 14 ++-
.../server/src/streaming/topics/consumer_groups.rs | 6 +-
core/server/src/streaming/topics/messages.rs | 55 +++++----
core/server/src/streaming/topics/partitions.rs | 14 +--
core/server/src/streaming/topics/storage.rs | 6 +-
core/server/src/streaming/topics/topic.rs | 26 +++--
34 files changed, 535 insertions(+), 294 deletions(-)
diff --git a/.asf.yaml b/.asf.yaml
index ca043bb7..d9ca3e85 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -43,6 +43,11 @@ github:
dismiss_stale_reviews: true
required_approving_review_count: 2
required_conversation_resolution: true
+ io_uring_tpc:
+ required_pull_request_reviews:
+ dismiss_stale_reviews: true
+ required_approving_review_count: 1
+ required_conversation_resolution: true
custom_subjects:
new_discussion: "{title}"
edit_discussion: "Re: {title}"
diff --git a/core/common/src/error/iggy_error.rs
b/core/common/src/error/iggy_error.rs
index cc8a284b..334e14c7 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -459,6 +459,11 @@ pub enum IggyError {
CannotReadIndexPosition = 10011,
#[error("Cannot read index timestamp")]
CannotReadIndexTimestamp = 10012,
+
+ #[error("Shard not found for stream ID: {0}, topic ID: {1}, partition ID:
{2}")]
+ ShardNotFound(u32, u32, u32) = 11000,
+ #[error("Shard communication error, shard ID: {0}")]
+ ShardCommunicationError(u16) = 11001,
}
impl IggyError {
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 6603a7d5..f055476e 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -64,7 +64,6 @@ pub use
types::configuration::tcp_config::tcp_client_config::*;
pub use types::configuration::tcp_config::tcp_client_config_builder::*;
pub use types::configuration::tcp_config::tcp_client_reconnection_config::*;
pub use types::configuration::tcp_config::tcp_connection_string_options::*;
-pub use types::confirmation::*;
pub use types::consumer::consumer_group::*;
pub use types::consumer::consumer_kind::*;
pub use types::consumer::consumer_offset_info::*;
diff --git a/core/common/src/types/confirmation/mod.rs
b/core/common/src/types/confirmation/mod.rs
deleted file mode 100644
index bc26cffa..00000000
--- a/core/common/src/types/confirmation/mod.rs
+++ /dev/null
@@ -1,53 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use serde::{Deserialize, Serialize};
-use strum::{Display, EnumString};
-
-#[derive(Clone, Copy, Debug, Default, Display, Serialize, Deserialize,
EnumString, PartialEq)]
-#[strum(serialize_all = "snake_case")]
-pub enum Confirmation {
- #[default]
- Wait,
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use std::str::FromStr;
-
- #[test]
- fn test_to_string() {
- assert_eq!(Confirmation::Wait.to_string(), "wait");
- assert_eq!(Confirmation::NoWait.to_string(), "no_wait");
- }
-
- #[test]
- fn test_from_str() {
- assert_eq!(Confirmation::from_str("wait").unwrap(),
Confirmation::Wait);
- assert_eq!(
- Confirmation::from_str("no_wait").unwrap(),
- Confirmation::NoWait
- );
- }
-
- #[test]
- fn test_default() {
- assert_eq!(Confirmation::default(), Confirmation::Wait);
- }
-}
diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs
index 2376abd6..6940f065 100644
--- a/core/common/src/types/mod.rs
+++ b/core/common/src/types/mod.rs
@@ -21,7 +21,6 @@ pub(crate) mod client_state;
pub(crate) mod command;
pub(crate) mod compression;
pub(crate) mod configuration;
-pub(crate) mod confirmation;
pub(crate) mod consumer;
pub(crate) mod diagnostic;
pub(crate) mod identifier;
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs
b/core/integration/tests/streaming/get_by_timestamp.rs
index d0cc2e5b..93a178aa 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -202,10 +202,7 @@ async fn test_get_messages_by_timestamp(
let batch =
IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size);
assert_eq!(batch.count(), batch_len);
- partition
- .append_messages(batch)
- .await
- .unwrap();
+ partition.append_messages(batch).await.unwrap();
// Capture the timestamp of this batch
batch_timestamps.push(IggyTimestamp::now());
diff --git a/core/integration/tests/streaming/topic_messages.rs
b/core/integration/tests/streaming/topic_messages.rs
index f21263c4..19de255f 100644
--- a/core/integration/tests/streaming/topic_messages.rs
+++ b/core/integration/tests/streaming/topic_messages.rs
@@ -62,10 +62,7 @@ async fn assert_polling_messages() {
.map(|m| m.get_size_bytes())
.sum::<IggyByteSize>();
let batch = IggyMessagesBatchMut::from_messages(&messages,
batch_size.as_bytes_u32());
- topic
- .append_messages(&partitioning, batch)
- .await
- .unwrap();
+ topic.append_messages(&partitioning, batch).await.unwrap();
let consumer = PollingConsumer::Consumer(1, partition_id);
let (_, polled_messages) = topic
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 99286fac..2e82e462 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -50,7 +50,7 @@ pub use iggy_binary_protocol::{
};
pub use iggy_common::{
Aes256GcmEncryptor, Args, ArgsOptional, AutoLogin, BytesSerializable,
CacheMetrics,
- CacheMetricsKey, ClientError, ClientInfoDetails, CompressionAlgorithm,
Confirmation, Consumer,
+ CacheMetricsKey, ClientError, ClientInfoDetails, CompressionAlgorithm,
Consumer,
ConsumerGroupDetails, ConsumerKind, EncryptorKind, FlushUnsavedBuffer,
GlobalPermissions,
HeaderKey, HeaderValue, HttpClientConfig, HttpClientConfigBuilder, IdKind,
Identifier,
IdentityInfo, IggyByteSize, IggyDuration, IggyError, IggyExpiry,
IggyIndexView, IggyMessage,
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index 9d6d526b..c590dcf2 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -20,8 +20,12 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::messages::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::sender::SenderKind;
-use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::transmission::frame::ShardResponse;
+use crate::shard::transmission::message::{ShardMessage, ShardRequest};
+use crate::shard::{IggyShard, ShardRequestResult};
use crate::shard::system::messages::PollingArgs;
+use crate::streaming::segments::IggyMessagesBatchSet;
use crate::streaming::session::Session;
use crate::to_iovec;
use anyhow::Result;
@@ -60,48 +64,128 @@ impl ServerCommandHandler for PollMessages {
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
- let (metadata, messages) = shard
- .poll_messages(
- session,
- &self.consumer,
- &self.stream_id,
- &self.topic_id,
- self.partition_id,
- PollingArgs::new(self.strategy, self.count, self.auto_commit),
- )
- .await
+ let stream = shard
+ .get_stream(&self.stream_id)
+ .with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - stream not found for
stream ID: {}",
+ self.stream_id
+ )
+ })?;
+ let topic = stream.get_topic(
+ &self.topic_id,
+ ).with_error_context(|error| format!(
+ "{COMPONENT} (error: {error}) - topic not found for stream ID: {},
topic_id: {}",
+ self.stream_id, self.topic_id
+ ))?;
+
+ let PollMessages {
+ consumer,
+ partition_id,
+ strategy,
+ count,
+ auto_commit,
+ ..
+ } = self;
+ let args = PollingArgs::new(strategy, count, auto_commit);
+
+ shard.permissioner
+ .borrow()
+ .poll_messages(session.get_user_id(), topic.stream_id,
topic.topic_id)
.with_error_context(|error| format!(
- "{COMPONENT} (error: {error}) - failed to poll messages for
consumer: {}, stream_id: {}, topic_id: {}, partition_id: {:?}, session:
{session}.",
- self.consumer, self.stream_id, self.topic_id, self.partition_id
+ "{COMPONENT} (error: {error}) - permission denied to poll
messages for user {} on stream ID: {}, topic ID: {}",
+ session.get_user_id(),
+ topic.stream_id,
+ topic.topic_id
))?;
+ if !topic.has_partitions() {
+ return Err(IggyError::NoPartitions(topic.topic_id,
topic.stream_id));
+ }
+
+ // There might be no partition assigned, if it's the consumer group
member without any partitions.
+ let Some((consumer, partition_id)) = topic
+ .resolve_consumer_with_partition_id(&consumer, session.client_id,
partition_id, true)
+ .with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to resolve consumer with partition id, consumer: {consumer}, client
ID: {}, partition ID: {:?}", session.client_id, partition_id))? else {
+ todo!("Send early response");
+ //return Ok((IggyPollMetadata::new(0, 0),
IggyMessagesBatchSet::empty()));
+ };
+
+ let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id,
partition_id);
+ let request = ShardRequest::PollMessages {
+ consumer,
+ partition_id,
+ args,
+ count,
+ };
+ let message = ShardMessage::Request(request);
+ let (metadata, batch) = match shard.send_request_to_shard(&namespace,
message).await {
+ ShardRequestResult::SameShard(message) => {
+ match message {
+ ShardMessage::Request(request) => {
+ match request {
+ ShardRequest::PollMessages {
+ consumer,
+ partition_id,
+ args,
+ count,
+ } => {
+ topic.get_messages(consumer, partition_id,
args.strategy, count).await?
+
+ }
+ _ => unreachable!(
+ "Expected a SendMessages request inside of
SendMessages handler, impossible state"
+ ),
+ }
+ }
+ _ => unreachable!(
+ "Expected a request message inside of an command
handler, impossible state"
+ ),
+ }
+ }
+ ShardRequestResult::Result(result) => {
+ match result? {
+ ShardResponse::PollMessages(response) => {
+ response
+ }
+ ShardResponse::ErrorResponse(err) => {
+ return Err(err);
+ }
+ _ => unreachable!(
+ "Expected a PollMessages response inside of
PollMessages handler, impossible state"
+ ),
+ }
+ }
+ };
+
+
// Collect all chunks first into a Vec to extend their lifetimes.
// This ensures the Bytes (in reality Arc<[u8]>) references from each
IggyMessagesBatch stay alive
// throughout the async vectored I/O operation, preventing "borrowed
value does not live
// long enough" errors while optimizing transmission by using larger
chunks.
// 4 bytes for partition_id + 8 bytes for current_offset + 4 bytes for
messages_count + size of all batches.
- let response_length = 4 + 8 + 4 + messages.size();
+ let response_length = 4 + 8 + 4 + batch.size();
let response_length_bytes = response_length.to_le_bytes();
let partition_id = metadata.partition_id.to_le_bytes();
let current_offset = metadata.current_offset.to_le_bytes();
- let count = messages.count().to_le_bytes();
+ let count = batch.count().to_le_bytes();
- let mut io_slices = Vec::with_capacity(messages.containers_count() +
3);
- io_slices.push(to_iovec(&partition_id));
- io_slices.push(to_iovec(¤t_offset));
- io_slices.push(to_iovec(&count));
+ let mut iovecs = Vec::with_capacity(batch.containers_count() + 3);
+ iovecs.push(to_iovec(&partition_id));
+ iovecs.push(to_iovec(¤t_offset));
+ iovecs.push(to_iovec(&count));
- io_slices.extend(messages.iter().map(|m| to_iovec(&m)));
+ iovecs.extend(batch.iter().map(|m| to_iovec(&m)));
trace!(
"Sending {} messages to client ({} bytes) to client",
- messages.count(),
+ batch.count(),
response_length
);
sender
- .send_ok_response_vectored(&response_length_bytes, io_slices)
+ .send_ok_response_vectored(&response_length_bytes, iovecs)
.await?;
Ok(())
}
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index d9a4770f..61e2c39c 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -16,19 +16,25 @@
* under the License.
*/
+use super::COMPONENT;
use crate::binary::command::{BinaryServerCommand, ServerCommandHandler};
use crate::binary::sender::SenderKind;
-use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::transmission::frame::ShardResponse;
+use crate::shard::transmission::message::{ShardMessage, ShardRequest};
+use crate::shard::{IggyShard, ShardRequestResult};
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
use crate::streaming::session::Session;
use crate::streaming::utils::PooledBuffer;
use anyhow::Result;
use bytes::BytesMut;
-use iggy_common::INDEX_SIZE;
+use error_set::ErrContext;
use iggy_common::Identifier;
use iggy_common::Sizeable;
+use iggy_common::{INDEX_SIZE, IdKind};
use iggy_common::{IggyError, Partitioning, SendMessages, Validatable};
use std::rc::Rc;
+use std::sync::Arc;
use tracing::instrument;
impl ServerCommandHandler for SendMessages {
@@ -104,19 +110,98 @@ impl ServerCommandHandler for SendMessages {
indexes,
messages_buffer,
);
-
batch.validate()?;
- shard
- .append_messages(
- session,
- &self.stream_id,
- &self.topic_id,
- &self.partitioning,
- batch,
- )
- .await?;
-
+ let stream = shard
+ .get_stream(&self.stream_id)
+ .with_error_context(|error| {
+ format!(
+ "Failed to get stream with ID: {} (error: {})",
+ self.stream_id, error
+ )
+ })?;
+ let topic = stream
+ .get_topic(&self.topic_id)
+ .with_error_context(|error| {
+ format!(
+ "Failed to get topic with ID: {} (error: {})",
+ self.topic_id, error
+ )
+ })?;
+ let stream_id = stream.stream_id;
+ let topic_id = topic.topic_id;
+ let partition_id = topic.calculate_partition_id(&self.partitioning)?;
+
+ // Validate permissions for given user on stream and topic.
+ shard.permissioner.borrow().append_messages(
+ session.get_user_id(),
+ stream_id,
+ topic_id
+ ).with_error_context(|error| format!(
+ "{COMPONENT} (error: {error}) - permission denied to append
messages for user {} on stream ID: {}, topic ID: {}",
+ session.get_user_id(),
+ stream_id,
+ topic_id
+ ))?;
+ let messages_count = batch.count();
+ shard.metrics.increment_messages(messages_count as u64);
+
+ // Encrypt messages if encryptor is enabled in configuration.
+ let batch = shard.maybe_encrypt_messages(batch)?;
+
+ let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id,
partition_id);
+ let request = ShardRequest::SendMessages {
+ stream_id,
+ topic_id,
+ partition_id,
+ batch,
+ };
+ let message = ShardMessage::Request(request);
+ // Egh... I don't like those nested match statements,
+ // Technically there is only two `request` types that will ever be
dispatched
+ // to different shards, thus we could get away with generic structs
+ // maybe todo ?
+ // how to make this code reusable, in a sense that we will have
exactly the same code inside of
+ // `PollMessages` handler, but with different request....
+ match shard.send_request_to_shard(&namespace, message).await {
+ ShardRequestResult::SameShard(message) => {
+ match message {
+ ShardMessage::Request(request) => {
+ match request {
+ ShardRequest::SendMessages {
+ stream_id,
+ topic_id,
+ partition_id,
+ batch,
+ } => {
+ // Just shut up rust analyzer.
+ let _stream_id = stream_id;
+ let _topic_id = topic_id;
+ topic.append_messages(partition_id,
batch).await?
+ }
+ _ => unreachable!(
+ "Expected a SendMessages request inside of
SendMessages handler, impossible state"
+ ),
+ }
+ }
+ _ => unreachable!(
+ "Expected a request message inside of an command
handler, impossible state"
+ ),
+ }
+ }
+ ShardRequestResult::Result(result) => {
+ match result? {
+ ShardResponse::SendMessages => {
+ ()
+ }
+ ShardResponse::ErrorResponse(err) => {
+ return Err(err);
+ }
+ _ => unreachable!("Expected a SendMessages response inside
of SendMessages handler, impossible state"),
+ }
+
+ }
+ };
sender.send_empty_ok_response().await?;
Ok(())
}
diff --git a/core/server/src/configs/defaults.rs
b/core/server/src/configs/defaults.rs
index 82b560cf..dd74ade7 100644
--- a/core/server/src/configs/defaults.rs
+++ b/core/server/src/configs/defaults.rs
@@ -451,12 +451,6 @@ impl Default for SegmentConfig {
cache_indexes:
SERVER_CONFIG.system.segment.cache_indexes.parse().unwrap(),
message_expiry:
SERVER_CONFIG.system.segment.message_expiry.parse().unwrap(),
archive_expired: SERVER_CONFIG.system.segment.archive_expired,
- server_confirmation: SERVER_CONFIG
- .system
- .segment
- .server_confirmation
- .parse()
- .unwrap(),
}
}
}
diff --git a/core/server/src/configs/displays.rs
b/core/server/src/configs/displays.rs
index de4805a8..51873421 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/server/src/configs/displays.rs
@@ -289,12 +289,11 @@ impl Display for SegmentConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "{{ size_bytes: {}, cache_indexes: {}, message_expiry: {},
archive_expired: {}, server_confirmation: {} }}",
+ "{{ size_bytes: {}, cache_indexes: {}, message_expiry: {},
archive_expired: {} }}",
self.size,
self.cache_indexes,
self.message_expiry,
self.archive_expired,
- self.server_confirmation,
)
}
}
diff --git a/core/server/src/configs/system.rs
b/core/server/src/configs/system.rs
index fc263a26..a5193294 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -17,7 +17,6 @@
*/
use super::cache_indexes::CacheIndexesConfig;
-use iggy_common::Confirmation;
use iggy_common::IggyByteSize;
use iggy_common::IggyExpiry;
use iggy_common::MaxTopicSize;
@@ -141,8 +140,6 @@ pub struct SegmentConfig {
#[serde_as(as = "DisplayFromStr")]
pub message_expiry: IggyExpiry,
pub archive_expired: bool,
- #[serde_as(as = "DisplayFromStr")]
- pub server_confirmation: Confirmation,
}
#[serde_as]
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index dbd94b68..7976cca3 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -47,14 +47,12 @@ use crate::{
shard::{
system::info::SystemInfo,
transmission::{
- frame::ShardFrame,
- message::{ShardEvent, ShardMessage},
+ frame::{ShardFrame, ShardResponse},
+ message::{ShardEvent, ShardMessage, ShardRequest},
},
},
state::{
- StateKind,
- file::FileState,
- system::{StreamState, SystemState, UserState},
+ file::FileState, system::{StreamState, SystemState, UserState},
StateKind
},
streaming::{
clients::client_manager::ClientManager,
@@ -85,12 +83,37 @@ impl Shard {
connection,
}
}
+
+ pub async fn send_request(&self, message: ShardMessage) ->
Result<ShardResponse, IggyError> {
+ let (sender, receiver) = async_channel::bounded(1);
+ self.connection
+ .sender
+ .send(ShardFrame::new(message, Some(sender.clone()))); //
Apparently sender needs to be cloned, otherwise channel will close...
+ //TODO: Fixme
+ let response = receiver.recv().await
+ .map_err(|err| {
+ error!("Failed to receive response from shard: {err}");
+ IggyError::ShardCommunicationError(self.id)
+ })?;
+ Ok(response)
+ }
}
struct ShardInfo {
id: u16,
}
+impl ShardInfo {
+ pub fn new(id: u16) -> Self {
+ Self { id }
+ }
+}
+
+pub enum ShardRequestResult<T, E> {
+ SameShard(ShardMessage),
+ Result(Result<T, E>),
+}
+
pub struct IggyShard {
pub id: u16,
shards: Vec<Shard>,
@@ -426,6 +449,53 @@ impl IggyShard {
self.shards.len() as u32
}
+ pub async fn send_request_to_shard(
+ &self,
+ namespace: &IggyNamespace,
+ message: ShardMessage,
+ ) -> ShardRequestResult<ShardResponse, IggyError> {
+ if let Some(shard) = self.find_shard(namespace) {
+ if shard.id == self.id {
+ return ShardRequestResult::SameShard(message);
+ }
+
+ let response = match shard.send_request(message).await {
+ Ok(response) => response,
+ Err(err) => {
+ error!(
+ "{COMPONENT} - failed to send request to shard with
ID: {}, error: {err}",
+ shard.id
+ );
+ return ShardRequestResult::Result(Err(err));
+ }
+ };
+ ShardRequestResult::Result(Ok(response))
+ } else {
+ ShardRequestResult::Result(Err(IggyError::ShardNotFound(
+ namespace.stream_id,
+ namespace.topic_id,
+ namespace.partition_id,
+ )))
+ }
+ }
+
+ fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> {
+ let shards_table = self.shards_table.borrow();
+ shards_table.get(namespace).map(|shard_info| {
+ self.shards
+ .iter()
+ .find(|shard| shard.id == shard_info.id)
+ .expect("Shard not found in the shards table.")
+ })
+ }
+
+ pub fn insert_shard_table_records(
+ &self,
+ records: impl Iterator<Item = (IggyNamespace, ShardInfo)>,
+ ) {
+ self.shards_table.borrow_mut().extend(records);
+ }
+
pub fn broadcast_event_to_all_shards(&self, client_id: u32, event:
ShardEvent) {
self.shards
.iter()
diff --git a/core/server/src/shard/namespace.rs
b/core/server/src/shard/namespace.rs
index a7c899aa..6b3b5b7d 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -17,19 +17,18 @@
*/
use hash32::{Hasher, Murmur3Hasher};
-use iggy_common::Identifier;
use std::hash::Hasher as _;
//TODO: Will probably want to move it to separate crate so we can share it
with sdk.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct IggyNamespace {
- pub(crate) stream_id: Identifier,
- pub(crate) topic_id: Identifier,
+ pub(crate) stream_id: u32,
+ pub(crate) topic_id: u32,
pub(crate) partition_id: u32,
}
impl IggyNamespace {
- pub fn new(stream_id: Identifier, topic_id: Identifier, partition_id: u32)
-> Self {
+ pub fn new(stream_id: u32, topic_id: u32, partition_id: u32) -> Self {
Self {
stream_id,
topic_id,
@@ -39,8 +38,8 @@ impl IggyNamespace {
pub fn generate_hash(&self) -> u32 {
let mut hasher = Murmur3Hasher::default();
- hasher.write(&self.stream_id.value);
- hasher.write(&self.topic_id.value);
+ hasher.write_u32(self.stream_id);
+ hasher.write_u32(self.topic_id);
hasher.write_u32(self.partition_id);
hasher.finish32()
}
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 9a23a9bb..76337a1a 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -25,7 +25,7 @@ use crate::streaming::utils::PooledBuffer;
use async_zip::tokio::read::stream;
use error_set::ErrContext;
use iggy_common::{
- BytesSerializable, Confirmation, Consumer, EncryptorKind,
IGGY_MESSAGE_HEADER_SIZE, Identifier,
+ BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE,
Identifier,
IggyError, Partitioning, PollingStrategy,
};
use tracing::{error, trace};
@@ -98,47 +98,6 @@ impl IggyShard {
Ok((metadata, batch_set))
}
- pub async fn append_messages(
- &self,
- session: &Session,
- stream_id: &Identifier,
- topic_id: &Identifier,
- partitioning: &Partitioning,
- messages: IggyMessagesBatchMut,
- ) -> Result<(), IggyError> {
- self.ensure_authenticated(session)?;
- let stream = self.get_stream(stream_id).with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - stream not found for
stream_id: {stream_id}")
- })?;
- let stream_id = stream.stream_id;
- let topic = self.find_topic(session, &stream,
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) -
topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?;
- self.permissioner.borrow().append_messages(
- session.get_user_id(),
- topic.stream_id,
- topic.topic_id
- ).with_error_context(|error| format!(
- "{COMPONENT} (error: {error}) - permission denied to append
messages for user {} on stream ID: {}, topic ID: {}",
- session.get_user_id(),
- topic.stream_id,
- topic.topic_id
- ))?;
- let messages_count = messages.count();
-
- // Encrypt messages if encryptor is configured
- let messages = if let Some(encryptor) = &self.encryptor {
- self.encrypt_messages(messages, encryptor)?
- } else {
- messages
- };
-
- topic
- .append_messages(partitioning, messages)
- .await?;
-
- self.metrics.increment_messages(messages_count as u64);
- Ok(())
- }
-
pub async fn flush_unsaved_buffer(
&self,
session: &Session,
@@ -167,7 +126,7 @@ impl IggyShard {
Ok(())
}
- async fn decrypt_messages(
+ pub async fn decrypt_messages(
&self,
batches: IggyMessagesBatchSet,
encryptor: &EncryptorKind,
@@ -206,11 +165,14 @@ impl IggyShard {
Ok(IggyMessagesBatchSet::from_vec(decrypted_batches))
}
- fn encrypt_messages(
+ pub fn maybe_encrypt_messages(
&self,
batch: IggyMessagesBatchMut,
- encryptor: &EncryptorKind,
) -> Result<IggyMessagesBatchMut, IggyError> {
+ let encryptor = match self.encryptor.as_ref() {
+ Some(encryptor) => encryptor,
+ None => return Ok(batch),
+ };
let mut encrypted_messages = PooledBuffer::with_capacity(batch.size()
as usize * 2);
let count = batch.count();
let mut indexes = IggyIndexesMut::with_capacity(batch.count() as
usize, 0);
diff --git a/core/server/src/shard/system/partitions.rs
b/core/server/src/shard/system/partitions.rs
index 01e02968..78ef2144 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -18,6 +18,8 @@
use super::COMPONENT;
use crate::shard::IggyShard;
+use crate::shard::ShardInfo;
+use crate::shard::namespace::IggyNamespace;
use crate::streaming::session::Session;
use error_set::ErrContext;
use iggy_common::Identifier;
@@ -54,6 +56,7 @@ impl IggyShard {
let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get stream with
ID: {stream_id}")
})?;
+ let stream_id = stream.stream_id;
let topic = stream
.get_topic_mut(topic_id)
.with_error_context(|error| {
@@ -61,15 +64,25 @@ impl IggyShard {
"{COMPONENT} (error: {error}) - failed to get mutable
reference to stream with id: {stream_id}"
)
})?;
+ let topic_id = topic.topic_id;
// TODO: Make add persisted partitions to topic sync, and extract the
storage persister out of it
// perform disk i/o outside of the borrow_mut of the stream.
- topic
+ let partition_ids = topic
.add_persisted_partitions(partitions_count)
.await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to add
persisted partitions, topic: {topic}")
})?;
+ let records = partition_ids.into_iter().map(|partition_id| {
+ let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
+ let hash = namespace.generate_hash();
+ let shard_id = hash % self.get_available_shards_count();
+ let shard_info = ShardInfo::new(shard_id as u16);
+ (namespace, shard_info)
+ });
+ self.insert_shard_table_records(records);
+
topic.reassign_consumer_groups();
self.metrics.increment_partitions(partitions_count);
self.metrics.increment_segments(partitions_count);
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index 3cb0da5b..fecec536 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -112,6 +112,15 @@ impl IggyShard {
}
}
+ pub fn try_get_topic_id(&self, stream_id: &Identifier, name: &str) ->
Option<u32> {
+ let stream = self.get_stream(stream_id).ok()?;
+ stream.topics_ids.get(name).and_then(|id| Some(*id))
+ }
+
+ pub fn try_get_stream_id(&self, name: &str) -> Option<u32> {
+ self.streams_ids.borrow().get(name).and_then(|id| Some(*id))
+ }
+
fn try_get_stream_by_name(&self, name: &str) -> Option<Ref<'_, Stream>> {
self.streams_ids
.borrow()
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index 477308bc..252dae30 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -17,7 +17,8 @@
*/
use super::COMPONENT;
-use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::{IggyShard, ShardInfo};
use crate::streaming::session::Session;
use crate::streaming::streams::stream::Stream;
use crate::streaming::topics::topic::Topic;
@@ -131,8 +132,11 @@ impl IggyShard {
// TODO: Make create topic sync, and extract the storage persister out
of it
// perform disk i/o outside of the borrow_mut of the stream.
- let created_topic_id = self
- .get_stream_mut(stream_id)?
+ let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get mutable
reference to stream with ID: {stream_id}")
+ })?;
+ let stream_id = stream.stream_id;
+ let (topic_id, partition_ids) = stream
.create_topic(
topic_id,
name,
@@ -146,12 +150,24 @@ impl IggyShard {
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to create topic
with name: {name} in stream ID: {stream_id}")
})?;
+ let records = partition_ids.into_iter().map(|partition_id| {
+ let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
+ // TODO: This setup isn't deterministic.
+ // Imagine a scenario where client creates partition using
`String` identifiers,
+ // but then for poll_messages requests uses numeric ones.
+ // the namespace wouldn't match, therefore we would get miss in
the shard table.
+ let hash = namespace.generate_hash();
+ let shard_id = hash % self.get_available_shards_count();
+ let shard_info = ShardInfo::new(shard_id as u16);
+ (namespace, shard_info)
+ });
+ self.insert_shard_table_records(records);
self.metrics.increment_topics(1);
self.metrics.increment_partitions(partitions_count);
self.metrics.increment_segments(partitions_count);
- Ok(Identifier::numeric(created_topic_id)?)
+ Ok(Identifier::numeric(topic_id)?)
}
#[allow(clippy::too_many_arguments)]
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
new file mode 100644
index 00000000..45815a50
--- /dev/null
+++ b/core/server/src/shard/transmission/event.rs
@@ -0,0 +1,60 @@
+use std::net::SocketAddr;
+
+use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
+
+use crate::streaming::clients::client_manager::Transport;
+
+pub enum ShardEvent {
+ CreatedStream {
+ stream_id: Option<u32>,
+ topic_id: Identifier
+ },
+ //DeletedStream(Identifier),
+ //UpdatedStream(Identifier, String),
+ //PurgedStream(Identifier),
+ //CreatedPartitions(Identifier, Identifier, u32),
+ //DeletedPartitions(Identifier, Identifier, u32),
+ CreatedTopic(
+ Identifier,
+ Option<u32>,
+ String,
+ u32,
+ IggyExpiry,
+ CompressionAlgorithm,
+ MaxTopicSize,
+ Option<u8>,
+ ),
+ //CreatedConsumerGroup(Identifier, Identifier, Option<u32>, String),
+ //DeletedConsumerGroup(Identifier, Identifier, Identifier),
+ /*
+ UpdatedTopic(
+ Identifier,
+ Identifier,
+ String,
+ IggyExpiry,
+ CompressionAlgorithm,
+ MaxTopicSize,
+ Option<u8>,
+ ),
+ */
+ //PurgedTopic(Identifier, Identifier),
+ //DeletedTopic(Identifier, Identifier),
+ //CreatedUser(String, String, UserStatus, Option<Permissions>),
+ //DeletedUser(Identifier),
+ LoginUser {
+ username: String,
+ password: String,
+ },
+ //LogoutUser,
+ //UpdatedUser(Identifier, Option<String>, Option<UserStatus>),
+ //ChangedPassword(Identifier, String, String),
+ //CreatedPersonalAccessToken(String, IggyExpiry),
+ //DeletedPersonalAccessToken(String),
+ //LoginWithPersonalAccessToken(String),
+ //StoredConsumerOffset(Identifier, Identifier, PollingConsumer, u64),
+ NewSession {
+ user_id: u32,
+ socket_addr: SocketAddr,
+ transport: Transport,
+ },
+}
\ No newline at end of file
diff --git a/core/server/src/shard/transmission/frame.rs
b/core/server/src/shard/transmission/frame.rs
index ec63daf0..ff519ab4 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -16,38 +16,33 @@
* under the License.
*/
use async_channel::Sender;
-use bytes::Bytes;
use iggy_common::IggyError;
-use crate::shard::transmission::message::ShardMessage;
+use
crate::{binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
shard::transmission::message::ShardMessage,
streaming::segments::IggyMessagesBatchSet};
+
+#[derive(Debug)]
+pub enum ShardResponse {
+ PollMessages((IggyPollMetadata, IggyMessagesBatchSet)),
+ SendMessages,
+ ShardEvent,
+ ErrorResponse(IggyError),
+}
#[derive(Debug)]
pub struct ShardFrame {
- pub client_id: u32,
pub message: ShardMessage,
pub response_sender: Option<Sender<ShardResponse>>,
}
impl ShardFrame {
- pub fn new(
- client_id: u32,
- message: ShardMessage,
- response_sender: Option<Sender<ShardResponse>>,
- ) -> Self {
+ pub fn new(message: ShardMessage, response_sender:
Option<Sender<ShardResponse>>) -> Self {
Self {
- client_id,
message,
response_sender,
}
}
}
-#[derive(Debug)]
-pub enum ShardResponse {
- BinaryResponse(Bytes),
- ErrorResponse(IggyError),
-}
-
#[macro_export]
macro_rules! handle_response {
($sender:expr, $response:expr) => {
diff --git a/core/server/src/shard/transmission/message.rs
b/core/server/src/shard/transmission/message.rs
index cbf4fdd4..360e7a3c 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -1,5 +1,7 @@
use std::rc::Rc;
+use iggy_common::PollingStrategy;
+
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,11 +19,11 @@ use std::rc::Rc;
* specific language governing permissions and limitations
* under the License.
*/
-use crate::{binary::command::ServerCommand, streaming::session::Session};
+use crate::{shard::system::messages::PollingArgs,
streaming::{polling_consumer::PollingConsumer, segments::IggyMessagesBatchMut,
session::Session}};
#[derive(Debug)]
pub enum ShardMessage {
- Command(ServerCommand),
+ Request(ShardRequest),
Event(ShardEvent),
}
@@ -30,9 +32,25 @@ pub enum ShardEvent {
NewSession(),
}
-impl From<ServerCommand> for ShardMessage {
- fn from(command: ServerCommand) -> Self {
- ShardMessage::Command(command)
+#[derive(Debug)]
+pub enum ShardRequest {
+ SendMessages {
+ stream_id: u32,
+ topic_id: u32,
+ partition_id: u32,
+ batch: IggyMessagesBatchMut,
+ },
+ PollMessages {
+ partition_id: u32,
+ args: PollingArgs,
+ consumer: PollingConsumer,
+ count: u32,
+ },
+}
+
+impl From<ShardRequest> for ShardMessage {
+ fn from(request: ShardRequest) -> Self {
+ ShardMessage::Request(request)
}
}
diff --git a/core/server/src/shard/transmission/mod.rs
b/core/server/src/shard/transmission/mod.rs
index 1f9a79ed..4ed9bd1e 100644
--- a/core/server/src/shard/transmission/mod.rs
+++ b/core/server/src/shard/transmission/mod.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+pub mod event;
pub mod connector;
pub mod frame;
pub mod message;
diff --git a/core/server/src/streaming/partitions/messages.rs
b/core/server/src/streaming/partitions/messages.rs
index cc961370..3b2d942b 100644
--- a/core/server/src/streaming/partitions/messages.rs
+++ b/core/server/src/streaming/partitions/messages.rs
@@ -21,7 +21,7 @@ use crate::streaming::partitions::partition::Partition;
use crate::streaming::polling_consumer::PollingConsumer;
use crate::streaming::segments::*;
use error_set::ErrContext;
-use iggy_common::{Confirmation, IggyError, IggyTimestamp, Sizeable};
+use iggy_common::{IggyError, IggyTimestamp, Sizeable};
use std::sync::atomic::Ordering;
use tracing::trace;
@@ -218,10 +218,7 @@ impl Partition {
Ok(batches)
}
- pub async fn append_messages(
- &mut self,
- batch: IggyMessagesBatchMut,
- ) -> Result<(), IggyError> {
+ pub async fn append_messages(&mut self, batch: IggyMessagesBatchMut) ->
Result<(), IggyError> {
if batch.count() == 0 {
return Ok(());
}
@@ -564,10 +561,7 @@ mod tests {
.map(|m| m.get_size_bytes().as_bytes_u32())
.sum();
let initial_batch =
IggyMessagesBatchMut::from_messages(&initial_messages, initial_size);
- partition
- .append_messages(initial_batch)
- .await
- .unwrap();
+ partition.append_messages(initial_batch).await.unwrap();
// Now try to add only duplicates
let duplicate_messages = vec![
@@ -582,10 +576,7 @@ mod tests {
.sum();
let duplicate_batch =
IggyMessagesBatchMut::from_messages(&duplicate_messages,
duplicate_size);
- partition
- .append_messages(duplicate_batch)
- .await
- .unwrap();
+ partition.append_messages(duplicate_batch).await.unwrap();
let loaded_messages = partition.get_messages_by_offset(0,
10).await.unwrap();
@@ -739,8 +730,7 @@ mod tests {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
- )
- .await,
+ ),
temp_dir,
)
}
diff --git a/core/server/src/streaming/partitions/partition.rs
b/core/server/src/streaming/partitions/partition.rs
index 1b8bacf8..4a573d14 100644
--- a/core/server/src/streaming/partitions/partition.rs
+++ b/core/server/src/streaming/partitions/partition.rs
@@ -84,7 +84,7 @@ impl ConsumerOffset {
impl Partition {
#[allow(clippy::too_many_arguments)]
- pub async fn create(
+ pub fn create(
stream_id: u32,
topic_id: u32,
partition_id: u32,
@@ -247,8 +247,7 @@ mod tests {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
- )
- .await;
+ );
assert_eq!(partition.stream_id, stream_id);
assert_eq!(partition.topic_id, topic_id);
@@ -290,8 +289,7 @@ mod tests {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
- )
- .await;
+ );
assert!(partition.segments.is_empty());
}
}
diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs
b/core/server/src/streaming/segments/messages/messages_reader.rs
index 0f413650..4091e8e3 100644
--- a/core/server/src/streaming/segments/messages/messages_reader.rs
+++ b/core/server/src/streaming/segments/messages/messages_reader.rs
@@ -16,9 +16,9 @@
* under the License.
*/
-use crate::io::file::{IggyFile};
+use crate::io::file::IggyFile;
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
-use crate::streaming::utils::{file, PooledBuffer};
+use crate::streaming::utils::{PooledBuffer, file};
use bytes::BytesMut;
use error_set::ErrContext;
use iggy_common::IggyError;
@@ -174,19 +174,18 @@ impl MessagesReader {
len: u32,
use_pool: bool,
) -> Result<PooledBuffer, std::io::Error> {
- if use_pool {
- let mut buf = PooledBuffer::with_capacity(len as usize);
- unsafe { buf.set_len(len as usize) };
- let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
- result?;
- Ok(buf)
-
- } else {
- let mut buf = BytesMut::with_capacity(len as usize);
- unsafe { buf.set_len(len as usize) };
- let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
- result?;
- Ok(PooledBuffer::from_existing(buf))
- }
+ if use_pool {
+ let mut buf = PooledBuffer::with_capacity(len as usize);
+ unsafe { buf.set_len(len as usize) };
+ let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
+ result?;
+ Ok(buf)
+ } else {
+ let mut buf = BytesMut::with_capacity(len as usize);
+ unsafe { buf.set_len(len as usize) };
+ let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
+ result?;
+ Ok(PooledBuffer::from_existing(buf))
+ }
}
}
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 0429817f..63e424a9 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -21,7 +21,7 @@ use crate::{
streaming::segments::{IggyMessagesBatchSet, messages::write_batch},
};
use error_set::ErrContext;
-use iggy_common::{Confirmation, IggyByteSize, IggyError};
+use iggy_common::{IggyByteSize, IggyError};
use monoio::fs::{File, OpenOptions};
use std::sync::{
Arc,
diff --git a/core/server/src/streaming/streams/storage.rs
b/core/server/src/streaming/streams/storage.rs
index 53639de9..ebd4946a 100644
--- a/core/server/src/streaming/streams/storage.rs
+++ b/core/server/src/streaming/streams/storage.rs
@@ -20,6 +20,7 @@ use crate::state::system::StreamState;
use crate::streaming::storage::StreamStorage;
use crate::streaming::streams::COMPONENT;
use crate::streaming::streams::stream::Stream;
+use crate::streaming::topics::topic::CreatedTopicInfo;
use crate::streaming::topics::topic::Topic;
use ahash::AHashSet;
use error_set::ErrContext;
@@ -83,7 +84,7 @@ impl StreamStorage for FileStreamStorage {
}
let topic_state = topic_state.unwrap();
- let topic = Topic::empty(
+ let topic_info = Topic::empty(
stream.stream_id,
topic_id,
&topic_state.name,
@@ -92,9 +93,8 @@ impl StreamStorage for FileStreamStorage {
stream.segments_count.clone(),
stream.config.clone(),
stream.storage.clone(),
- )
- .await;
- unloaded_topics.push(topic);
+ );
+ unloaded_topics.push(topic_info.topic);
}
let state_topic_ids =
state.topics.keys().copied().collect::<AHashSet<u32>>();
@@ -123,7 +123,7 @@ impl StreamStorage for FileStreamStorage {
);
for topic_id in missing_ids {
let topic_state = state.topics.get(&topic_id).unwrap();
- let topic = Topic::empty(
+ let topic_info = Topic::empty(
stream.stream_id,
topic_id,
&topic_state.name,
@@ -132,8 +132,8 @@ impl StreamStorage for FileStreamStorage {
stream.segments_count.clone(),
stream.config.clone(),
stream.storage.clone(),
- )
- .await;
+ );
+ let topic = topic_info.topic;
topic.persist().await.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to
persist topic: {topic}")
})?;
diff --git a/core/server/src/streaming/streams/topics.rs
b/core/server/src/streaming/streams/topics.rs
index 1d47cc36..6a201c7a 100644
--- a/core/server/src/streaming/streams/topics.rs
+++ b/core/server/src/streaming/streams/topics.rs
@@ -18,6 +18,7 @@
use crate::streaming::streams::COMPONENT;
use crate::streaming::streams::stream::Stream;
+use crate::streaming::topics::topic::CreatedTopicInfo;
use crate::streaming::topics::topic::Topic;
use error_set::ErrContext;
use iggy_common::CompressionAlgorithm;
@@ -44,7 +45,7 @@ impl Stream {
compression_algorithm: CompressionAlgorithm,
max_topic_size: MaxTopicSize,
replication_factor: u8,
- ) -> Result<u32, IggyError> {
+ ) -> Result<(u32, Vec<u32>), IggyError> {
let max_topic_size = Topic::get_max_topic_size(max_topic_size,
&self.config)?;
if self.topics_ids.contains_key(name) {
return Err(IggyError::TopicNameAlreadyExists(
@@ -74,7 +75,10 @@ impl Stream {
return Err(IggyError::TopicIdAlreadyExists(id, self.stream_id));
}
- let topic = Topic::create(
+ let CreatedTopicInfo {
+ topic,
+ partition_ids,
+ } = Topic::create(
self.stream_id,
id,
name,
@@ -88,15 +92,15 @@ impl Stream {
compression_algorithm,
max_topic_size,
replication_factor,
- )
- .await?;
+ )?;
+
topic.persist().await.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to persist topic:
{topic}")
})?;
info!("Created topic {}", topic);
self.topics_ids.insert(name.to_owned(), id);
self.topics.insert(id, topic);
- Ok(id)
+ Ok((id, partition_ids))
}
pub async fn update_topic(
diff --git a/core/server/src/streaming/topics/consumer_groups.rs
b/core/server/src/streaming/topics/consumer_groups.rs
index 28a8705c..79c7f9f3 100644
--- a/core/server/src/streaming/topics/consumer_groups.rs
+++ b/core/server/src/streaming/topics/consumer_groups.rs
@@ -458,7 +458,7 @@ mod tests {
let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
let segments_count_of_parent_stream = Arc::new(AtomicU32::new(0));
- Topic::create(
+ let topic_info = Topic::create(
stream_id,
id,
name,
@@ -473,7 +473,7 @@ mod tests {
MaxTopicSize::ServerDefault,
1,
)
- .await
- .unwrap()
+ .unwrap();
+ topic_info.topic
}
}
diff --git a/core/server/src/streaming/topics/messages.rs
b/core/server/src/streaming/topics/messages.rs
index acbe8c28..2f8f0557 100644
--- a/core/server/src/streaming/topics/messages.rs
+++ b/core/server/src/streaming/topics/messages.rs
@@ -25,7 +25,7 @@ use crate::streaming::utils::hash;
use ahash::AHashMap;
use error_set::ErrContext;
use iggy_common::locking::IggySharedMutFn;
-use iggy_common::{Confirmation, IggyTimestamp, PollingStrategy};
+use iggy_common::{IggyTimestamp, PollingStrategy};
use iggy_common::{IggyError, IggyExpiry, Partitioning, PartitioningKind,
PollingKind};
use std::sync::atomic::Ordering;
use tracing::trace;
@@ -78,8 +78,8 @@ impl Topic {
pub async fn append_messages(
&self,
- partitioning: &Partitioning,
- messages: IggyMessagesBatchMut,
+ partition_id: u32,
+ batch: IggyMessagesBatchMut,
) -> Result<(), IggyError> {
if !self.has_partitions() {
return Err(IggyError::NoPartitions(self.topic_id, self.stream_id));
@@ -91,24 +91,11 @@ impl Topic {
return Err(IggyError::TopicFull(self.topic_id, self.stream_id));
}
- if messages.is_empty() {
+ if batch.is_empty() {
return Ok(());
}
- let partition_id = match partitioning.kind {
- PartitioningKind::Balanced => self.get_next_partition_id(),
- PartitioningKind::PartitionId => u32::from_le_bytes(
- partitioning.value[..partitioning.length as usize]
- .try_into()
- .map_err(|_| IggyError::InvalidNumberEncoding)?,
- ),
- PartitioningKind::MessagesKey => {
-
self.calculate_partition_id_by_messages_key_hash(&partitioning.value)
- }
- };
-
- self.append_messages_to_partition(messages, partition_id)
- .await
+ self.append_messages_to_partition(batch, partition_id).await
}
pub async fn flush_unsaved_buffer(
@@ -152,6 +139,20 @@ impl Topic {
Ok(())
}
+ pub fn calculate_partition_id(&self, partitioning: &Partitioning) ->
Result<u32, IggyError> {
+ match partitioning.kind {
+ PartitioningKind::Balanced => Ok(self.get_next_partition_id()),
+ PartitioningKind::PartitionId => Ok(u32::from_le_bytes(
+ partitioning.value[..partitioning.length as usize]
+ .try_into()
+ .map_err(|_| IggyError::InvalidNumberEncoding)?,
+ )),
+ PartitioningKind::MessagesKey => {
+
Ok(self.calculate_partition_id_by_messages_key_hash(&partitioning.value))
+ }
+ }
+ }
+
fn get_next_partition_id(&self) -> u32 {
let mut partition_id = self.current_partition_id.fetch_add(1,
Ordering::SeqCst);
let partitions_count = self.partitions.len() as u32;
@@ -227,10 +228,8 @@ mod tests {
.build()
.expect("Failed to create message with valid payload and
headers");
let messages = IggyMessagesBatchMut::from_messages(&[message], 1);
- topic
- .append_messages(&partitioning, messages)
- .await
- .unwrap();
+ let partition =
topic.calculate_partition_id(&partitioning).unwrap();
+ topic.append_messages(partition, messages).await.unwrap();
}
let partitions = topic.get_partitions();
@@ -260,10 +259,10 @@ mod tests {
.build()
.expect("Failed to create message with valid payload and
headers");
let messages = IggyMessagesBatchMut::from_messages(&[message], 1);
- topic
- .append_messages(&partitioning, messages)
- .await
- .unwrap();
+ let partition_id = topic
+ .calculate_partition_id(&partitioning)
+ .expect("Failed to calculate partition ID");
+ topic.append_messages(partition_id, messages).await.unwrap();
}
let mut read_messages_count = 0;
@@ -337,7 +336,7 @@ mod tests {
let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
let segments_count_of_parent_stream = Arc::new(AtomicU32::new(0));
- let topic = Topic::create(
+ let created_info = Topic::create(
stream_id,
id,
name,
@@ -352,8 +351,8 @@ mod tests {
MaxTopicSize::ServerDefault,
1,
)
- .await
.unwrap();
+ let topic = created_info.topic;
topic.persist().await.unwrap();
topic
}
diff --git a/core/server/src/streaming/topics/partitions.rs
b/core/server/src/streaming/topics/partitions.rs
index dfedf0a9..b9e56331 100644
--- a/core/server/src/streaming/topics/partitions.rs
+++ b/core/server/src/streaming/topics/partitions.rs
@@ -34,7 +34,7 @@ impl Topic {
self.partitions.len() as u32
}
- pub async fn add_partitions(&mut self, count: u32) -> Result<Vec<u32>,
IggyError> {
+ pub fn add_partitions(&mut self, count: u32) -> Result<Vec<u32>,
IggyError> {
if count == 0 {
return Ok(vec![]);
}
@@ -60,8 +60,7 @@ impl Topic {
self.size_bytes.clone(),
self.segments_count_of_parent_stream.clone(),
IggyTimestamp::now(),
- )
- .await;
+ );
self.partitions
.insert(partition_id, IggyRwLock::new(partition));
partition_ids.push(partition_id)
@@ -71,12 +70,9 @@ impl Topic {
}
pub async fn add_persisted_partitions(&mut self, count: u32) ->
Result<Vec<u32>, IggyError> {
- let partition_ids = self
- .add_partitions(count)
- .await
- .with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to add
partitions, count: {count}")
- })?;
+ let partition_ids =
self.add_partitions(count).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to add partitions,
count: {count}")
+ })?;
for partition_id in &partition_ids {
let partition = self.partitions.get(partition_id).unwrap();
let mut partition = partition.write().await;
diff --git a/core/server/src/streaming/topics/storage.rs
b/core/server/src/streaming/topics/storage.rs
index db74fc98..827ba6c9 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -113,8 +113,7 @@ impl TopicStorage for FileTopicStorage {
topic.size_bytes.clone(),
topic.segments_count_of_parent_stream.clone(),
partition_state.created_at,
- )
- .await;
+ );
unloaded_partitions.push(partition);
}
@@ -160,8 +159,7 @@ impl TopicStorage for FileTopicStorage {
topic.size_bytes.clone(),
topic.segments_count_of_parent_stream.clone(),
partition_state.created_at,
- )
- .await;
+ );
partition.persist().await.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to persist
partition: {partition}"
diff --git a/core/server/src/streaming/topics/topic.rs
b/core/server/src/streaming/topics/topic.rs
index 2e9e38c9..43b28c49 100644
--- a/core/server/src/streaming/topics/topic.rs
+++ b/core/server/src/streaming/topics/topic.rs
@@ -63,9 +63,14 @@ pub struct Topic {
pub created_at: IggyTimestamp,
}
+pub struct CreatedTopicInfo {
+ pub topic: Topic,
+ pub partition_ids: Vec<u32>,
+}
+
impl Topic {
#[allow(clippy::too_many_arguments)]
- pub async fn empty(
+ pub fn empty(
stream_id: u32,
topic_id: u32,
name: &str,
@@ -74,7 +79,7 @@ impl Topic {
segments_count_of_parent_stream: Arc<AtomicU32>,
config: Arc<SystemConfig>,
storage: Rc<SystemStorage>,
- ) -> Topic {
+ ) -> CreatedTopicInfo {
Topic::create(
stream_id,
topic_id,
@@ -90,12 +95,11 @@ impl Topic {
MaxTopicSize::ServerDefault,
1,
)
- .await
.unwrap()
}
#[allow(clippy::too_many_arguments)]
- pub async fn create(
+ pub fn create(
stream_id: u32,
topic_id: u32,
name: &str,
@@ -109,7 +113,7 @@ impl Topic {
compression_algorithm: CompressionAlgorithm,
max_topic_size: MaxTopicSize,
replication_factor: u8,
- ) -> Result<Topic, IggyError> {
+ ) -> Result<CreatedTopicInfo, IggyError> {
let path = config.get_topic_path(stream_id, topic_id);
let partitions_path = config.get_partitions_path(stream_id, topic_id);
let mut topic = Topic {
@@ -142,8 +146,11 @@ impl Topic {
message_expiry, topic.message_expiry
);
- topic.add_partitions(partitions_count).await?;
- Ok(topic)
+ let partition_ids = topic.add_partitions(partitions_count)?;
+ Ok(CreatedTopicInfo {
+ topic,
+ partition_ids,
+ })
}
pub fn is_full(&self) -> bool {
@@ -313,7 +320,7 @@ mod tests {
let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
let segments_count_of_parent_stream = Arc::new(AtomicU32::new(0));
- let topic = Topic::create(
+ let topic_info = Topic::create(
stream_id,
topic_id,
name,
@@ -328,9 +335,8 @@ mod tests {
max_topic_size,
replication_factor,
)
- .await
.unwrap();
-
+ let topic = topic_info.topic;
assert_eq!(topic.stream_id, stream_id);
assert_eq!(topic.topic_id, topic_id);
assert_eq!(topic.path, path);