This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 3181db25 feat(io_uring): fix send/poll (#1930)
3181db25 is described below
commit 3181db256fb98af0196f33a6d97ceef34f964a33
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sun Jun 29 22:49:43 2025 +0200
feat(io_uring): fix send/poll (#1930)
---
Cargo.lock | 12 ++
core/integration/tests/streaming/get_by_offset.rs | 3 +-
.../tests/streaming/get_by_timestamp.rs | 4 +-
core/integration/tests/streaming/messages.rs | 6 +-
core/integration/tests/streaming/partition.rs | 15 +--
core/integration/tests/streaming/stream.rs | 6 +-
core/integration/tests/streaming/topic.rs | 29 ++--
core/integration/tests/streaming/topic_messages.rs | 25 ++--
core/server/Cargo.toml | 2 +-
.../handlers/messages/poll_messages_handler.rs | 67 ++++-----
.../handlers/messages/send_messages_handler.rs | 64 ++++-----
.../handlers/streams/create_stream_handler.rs | 12 +-
.../binary/handlers/topics/create_topic_handler.rs | 18 ++-
.../binary/handlers/users/login_user_handler.rs | 18 ++-
core/server/src/bootstrap.rs | 11 +-
core/server/src/configs/displays.rs | 5 +-
core/server/src/io/file.rs | 9 +-
core/server/src/main.rs | 35 ++---
core/server/src/shard/builder.rs | 2 +-
core/server/src/shard/mod.rs | 149 ++++++++++++++++-----
core/server/src/shard/system/messages.rs | 4 +-
core/server/src/shard/system/streams.rs | 41 ++++--
core/server/src/shard/system/topics.rs | 98 ++++++++++++--
core/server/src/shard/system/users.rs | 15 +++
core/server/src/shard/tasks/messages.rs | 38 ++++++
core/server/src/shard/tasks/mod.rs | 1 +
core/server/src/shard/transmission/event.rs | 29 ++--
core/server/src/shard/transmission/frame.rs | 7 +-
core/server/src/shard/transmission/message.rs | 40 ++++--
core/server/src/shard/transmission/mod.rs | 2 +-
core/server/src/state/file.rs | 6 +-
.../src/streaming/segments/indexes/index_reader.rs | 20 ++-
.../src/streaming/segments/indexes/index_writer.rs | 18 ++-
.../src/streaming/segments/indexes/indexes_mut.rs | 7 +-
core/server/src/streaming/segments/messages/mod.rs | 64 ++-------
core/server/src/streaming/segments/segment.rs | 3 +-
.../streaming/segments/types/messages_batch_mut.rs | 4 +
.../src/streaming/segments/writing_messages.rs | 3 +-
core/server/src/streaming/streams/storage.rs | 17 +--
core/server/src/streaming/topics/messages.rs | 2 +-
core/server/src/streaming/topics/storage.rs | 15 ++-
core/server/src/streaming/utils/memory_pool.rs | 3 +-
core/server/src/streaming/utils/pooled_buffer.rs | 12 +-
core/server/src/tcp/connection_handler.rs | 10 +-
core/server/src/tcp/sender.rs | 8 +-
core/server/src/tcp/tcp_listener.rs | 23 ++--
core/server/src/tcp/tcp_server.rs | 9 +-
core/server/src/tcp/tcp_socket.rs | 15 ++-
48 files changed, 631 insertions(+), 375 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index c2e95f8c..c386d838 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4795,6 +4795,7 @@ checksum =
"3bd0f8bcde87b1949f95338b547543fcab187bc7e7a5024247e359a5e828ba6a"
dependencies = [
"auto-const-array",
"bytes",
+ "flume",
"fxhash",
"io-uring",
"libc",
@@ -4802,8 +4803,10 @@ dependencies = [
"mio 0.8.11",
"monoio-macros",
"nix 0.26.4",
+ "once_cell",
"pin-project-lite",
"socket2",
+ "threadpool",
"windows-sys 0.48.0",
]
@@ -7435,6 +7438,15 @@ dependencies = [
"cfg-if",
]
+[[package]]
+name = "threadpool"
+version = "1.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
+dependencies = [
+ "num_cpus",
+]
+
[[package]]
name = "time"
version = "0.3.41"
diff --git a/core/integration/tests/streaming/get_by_offset.rs
b/core/integration/tests/streaming/get_by_offset.rs
index df9f8f40..e5cc2a33 100644
--- a/core/integration/tests/streaming/get_by_offset.rs
+++ b/core/integration/tests/streaming/get_by_offset.rs
@@ -127,8 +127,7 @@ async fn test_get_messages_by_offset(
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
- )
- .await;
+ );
setup.create_partitions_directory(stream_id, topic_id).await;
partition.persist().await.unwrap();
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs
b/core/integration/tests/streaming/get_by_timestamp.rs
index 93a178aa..7aff4a89 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -18,7 +18,6 @@
use crate::streaming::common::test_setup::TestSetup;
use bytes::BytesMut;
-use iggy::prelude::Confirmation;
use iggy::prelude::*;
use server::configs::cache_indexes::CacheIndexesConfig;
use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig};
@@ -129,8 +128,7 @@ async fn test_get_messages_by_timestamp(
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
- )
- .await;
+ );
setup.create_partitions_directory(stream_id, topic_id).await;
partition.persist().await.unwrap();
diff --git a/core/integration/tests/streaming/messages.rs
b/core/integration/tests/streaming/messages.rs
index 6ab6ca5b..c6363d9a 100644
--- a/core/integration/tests/streaming/messages.rs
+++ b/core/integration/tests/streaming/messages.rs
@@ -58,8 +58,7 @@ async fn
should_persist_messages_and_then_load_them_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
- )
- .await;
+ );
let mut messages = Vec::with_capacity(messages_count as usize);
let mut appended_messages = Vec::with_capacity(messages_count as usize);
@@ -126,8 +125,7 @@ async fn
should_persist_messages_and_then_load_them_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
now,
- )
- .await;
+ );
let partition_state = PartitionState {
id: partition.partition_id,
created_at: now,
diff --git a/core/integration/tests/streaming/partition.rs
b/core/integration/tests/streaming/partition.rs
index 4fff0d04..6d811e59 100644
--- a/core/integration/tests/streaming/partition.rs
+++ b/core/integration/tests/streaming/partition.rs
@@ -49,8 +49,7 @@ async fn should_persist_partition_with_segment() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
- )
- .await;
+ );
partition.persist().await.unwrap();
@@ -81,8 +80,7 @@ async fn should_load_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
- )
- .await;
+ );
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path,
with_segment).await;
@@ -101,8 +99,7 @@ async fn should_load_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
now,
- )
- .await;
+ );
let partition_state = PartitionState {
id: partition.partition_id,
created_at: now,
@@ -151,8 +148,7 @@ async fn should_delete_existing_partition_from_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
- )
- .await;
+ );
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path,
with_segment).await;
@@ -185,8 +181,7 @@ async fn should_purge_existing_partition_on_disk() {
Arc::new(AtomicU64::new(0)),
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
- )
- .await;
+ );
partition.persist().await.unwrap();
assert_persisted_partition(&partition.partition_path,
with_segment).await;
let messages = create_messages();
diff --git a/core/integration/tests/streaming/stream.rs
b/core/integration/tests/streaming/stream.rs
index 5b340750..677e2d32 100644
--- a/core/integration/tests/streaming/stream.rs
+++ b/core/integration/tests/streaming/stream.rs
@@ -145,10 +145,10 @@ async fn should_purge_existing_stream_on_disk() {
let topic = stream
.get_topic(&Identifier::numeric(topic_id).unwrap())
.unwrap();
- topic
- .append_messages(&Partitioning::partition_id(1), batch)
- .await
+ let partition_id = topic
+ .calculate_partition_id(&Partitioning::partition_id(1))
.unwrap();
+ topic.append_messages(partition_id, batch).await.unwrap();
let (_, loaded_messages) = topic
.get_messages(
PollingConsumer::Consumer(1, 1),
diff --git a/core/integration/tests/streaming/topic.rs
b/core/integration/tests/streaming/topic.rs
index f75ffe2b..e3d17df2 100644
--- a/core/integration/tests/streaming/topic.rs
+++ b/core/integration/tests/streaming/topic.rs
@@ -38,7 +38,7 @@ async fn
should_persist_topics_with_partitions_directories_and_info_file() {
let topic_ids = get_topic_ids();
for topic_id in topic_ids {
let name = format!("test-{topic_id}");
- let topic = Topic::create(
+ let created_topic_info = Topic::create(
stream_id,
topic_id,
&name,
@@ -53,8 +53,8 @@ async fn
should_persist_topics_with_partitions_directories_and_info_file() {
MaxTopicSize::ServerDefault,
1,
)
- .await
.unwrap();
+ let topic = created_topic_info.topic;
topic.persist().await.unwrap();
@@ -76,7 +76,7 @@ async fn should_load_existing_topic_from_disk() {
let topic_ids = get_topic_ids();
for topic_id in topic_ids {
let name = format!("test-{topic_id}");
- let topic = Topic::create(
+ let created_topic_info = Topic::create(
stream_id,
topic_id,
&name,
@@ -91,8 +91,9 @@ async fn should_load_existing_topic_from_disk() {
MaxTopicSize::ServerDefault,
1,
)
- .await
.unwrap();
+ let topic = created_topic_info.topic;
+
topic.persist().await.unwrap();
assert_persisted_topic(
&topic.path,
@@ -102,7 +103,7 @@ async fn should_load_existing_topic_from_disk() {
.await;
let created_at = IggyTimestamp::now();
- let mut loaded_topic = Topic::empty(
+ let created_topic_info = Topic::empty(
stream_id,
topic_id,
&name,
@@ -111,8 +112,8 @@ async fn should_load_existing_topic_from_disk() {
Arc::new(AtomicU32::new(0)),
setup.config.clone(),
setup.storage.clone(),
- )
- .await;
+ );
+ let mut loaded_topic = created_topic_info.topic;
let topic_state = TopicState {
id: topic_id,
name,
@@ -153,7 +154,7 @@ async fn should_delete_existing_topic_from_disk() {
let topic_ids = get_topic_ids();
for topic_id in topic_ids {
let name = format!("test-{topic_id}");
- let topic = Topic::create(
+ let created_topic_info = Topic::create(
stream_id,
topic_id,
&name,
@@ -168,8 +169,8 @@ async fn should_delete_existing_topic_from_disk() {
MaxTopicSize::ServerDefault,
1,
)
- .await
.unwrap();
+ let topic = created_topic_info.topic;
topic.persist().await.unwrap();
assert_persisted_topic(
&topic.path,
@@ -193,7 +194,7 @@ async fn should_purge_existing_topic_on_disk() {
let topic_ids = get_topic_ids();
for topic_id in topic_ids {
let name = format!("test-{topic_id}");
- let topic = Topic::create(
+ let created_topic_info = Topic::create(
stream_id,
topic_id,
&name,
@@ -208,8 +209,8 @@ async fn should_purge_existing_topic_on_disk() {
MaxTopicSize::ServerDefault,
1,
)
- .await
.unwrap();
+ let topic = created_topic_info.topic;
topic.persist().await.unwrap();
assert_persisted_topic(
&topic.path,
@@ -225,10 +226,10 @@ async fn should_purge_existing_topic_on_disk() {
.map(|msg| msg.get_size_bytes().as_bytes_u32())
.sum::<u32>();
let batch = IggyMessagesBatchMut::from_messages(&messages, batch_size);
- topic
- .append_messages(&Partitioning::partition_id(1), batch)
- .await
+ let partition_id = topic
+ .calculate_partition_id(&Partitioning::partition_id(1))
.unwrap();
+ topic.append_messages(partition_id, batch).await.unwrap();
let (_, loaded_messages) = topic
.get_messages(
PollingConsumer::Consumer(1, 1),
diff --git a/core/integration/tests/streaming/topic_messages.rs
b/core/integration/tests/streaming/topic_messages.rs
index 19de255f..3e5362ac 100644
--- a/core/integration/tests/streaming/topic_messages.rs
+++ b/core/integration/tests/streaming/topic_messages.rs
@@ -62,7 +62,8 @@ async fn assert_polling_messages() {
.map(|m| m.get_size_bytes())
.sum::<IggyByteSize>();
let batch = IggyMessagesBatchMut::from_messages(&messages,
batch_size.as_bytes_u32());
- topic.append_messages(&partitioning, batch).await.unwrap();
+ let partition_id = topic.calculate_partition_id(&partitioning).unwrap();
+ topic.append_messages(partition_id, batch).await.unwrap();
let consumer = PollingConsumer::Consumer(1, partition_id);
let (_, polled_messages) = topic
@@ -106,10 +107,8 @@ async fn
given_key_none_messages_should_be_appended_to_the_next_partition_using_
.expect("Failed to create message")],
batch_size.as_bytes_u32(),
);
- topic
- .append_messages(&partitioning, messages)
- .await
- .unwrap();
+ let partition_id =
topic.calculate_partition_id(&partitioning).unwrap();
+ topic.append_messages(partition_id, messages).await.unwrap();
}
for i in 1..=partitions_count {
assert_messages(&topic, i, messages_per_partition_count).await;
@@ -135,10 +134,8 @@ async fn
given_key_partition_id_messages_should_be_appended_to_the_chosen_partit
.expect("Failed to create message")],
batch_size.as_bytes_u32(),
);
- topic
- .append_messages(&partitioning, messages)
- .await
- .unwrap();
+ let partition_id =
topic.calculate_partition_id(&partitioning).unwrap();
+ topic.append_messages(partition_id, messages).await.unwrap();
}
for i in 1..=partitions_count {
@@ -168,10 +165,8 @@ async fn
given_key_messages_key_messages_should_be_appended_to_the_calculated_pa
.expect("Failed to create message")],
batch_size.as_bytes_u32(),
);
- topic
- .append_messages(&partitioning, messages)
- .await
- .unwrap();
+ let partition_id =
topic.calculate_partition_id(&partitioning).unwrap();
+ topic.append_messages(partition_id, messages).await.unwrap();
}
let mut messages_count_per_partition = HashMap::new();
@@ -212,7 +207,7 @@ async fn init_topic(setup: &TestSetup, partitions_count:
u32) -> Topic {
setup.create_topics_directory(stream_id).await;
let id = 2;
let name = "test";
- let topic = Topic::create(
+ let created_topic_info = Topic::create(
stream_id,
id,
name,
@@ -227,8 +222,8 @@ async fn init_topic(setup: &TestSetup, partitions_count:
u32) -> Topic {
MaxTopicSize::ServerDefault,
1,
)
- .await
.unwrap();
+ let topic = created_topic_info.topic;
topic.persist().await.unwrap();
topic
}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 59a10c29..26345967 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -66,7 +66,7 @@ lending-iterator = "0.1.7"
hash32 = "1.0.0"
mimalloc = { workspace = true, optional = true }
moka = { version = "0.12.10", features = ["future"] }
-monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat", "renameat"] }
+monoio = { version = "0.2.4", features = ["mkdirat", "unlinkat", "renameat",
"sync"] }
monoio-native-tls = "0.4.0"
nix = { version = "0.30", features = ["fs"] }
once_cell = "1.21.3"
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index c590dcf2..3abfb6a1 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -21,10 +21,10 @@ use crate::binary::handlers::messages::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::sender::SenderKind;
use crate::shard::namespace::IggyNamespace;
+use crate::shard::system::messages::PollingArgs;
use crate::shard::transmission::frame::ShardResponse;
-use crate::shard::transmission::message::{ShardMessage, ShardRequest};
+use crate::shard::transmission::message::{ShardMessage, ShardRequest,
ShardRequestPayload};
use crate::shard::{IggyShard, ShardRequestResult};
-use crate::shard::system::messages::PollingArgs;
use crate::streaming::segments::IggyMessagesBatchSet;
use crate::streaming::session::Session;
use crate::to_iovec;
@@ -112,53 +112,44 @@ impl ServerCommandHandler for PollMessages {
};
let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id,
partition_id);
- let request = ShardRequest::PollMessages {
- consumer,
- partition_id,
+ let payload = ShardRequestPayload::PollMessages {
+ consumer: consumer.clone(),
args,
count,
};
+ let request = ShardRequest::new(stream.stream_id, topic.topic_id,
partition_id, payload);
let message = ShardMessage::Request(request);
let (metadata, batch) = match shard.send_request_to_shard(&namespace,
message).await {
- ShardRequestResult::SameShard(message) => {
- match message {
- ShardMessage::Request(request) => {
- match request {
- ShardRequest::PollMessages {
- consumer,
- partition_id,
- args,
- count,
- } => {
- topic.get_messages(consumer, partition_id,
args.strategy, count).await?
-
- }
- _ => unreachable!(
- "Expected a SendMessages request inside of
SendMessages handler, impossible state"
- ),
- }
- }
- _ => unreachable!(
- "Expected a request message inside of an command
handler, impossible state"
- ),
- }
- }
- ShardRequestResult::Result(result) => {
- match result? {
- ShardResponse::PollMessages(response) => {
- response
- }
- ShardResponse::ErrorResponse(err) => {
- return Err(err);
+ ShardRequestResult::SameShard(message) => match message {
+ ShardMessage::Request(request) => match request.payload {
+ ShardRequestPayload::PollMessages {
+ consumer,
+ args,
+ count,
+ } => {
+ topic
+ .get_messages(consumer, partition_id,
args.strategy, count)
+ .await?
}
_ => unreachable!(
- "Expected a PollMessages response inside of
PollMessages handler, impossible state"
+ "Expected a SendMessages request inside of
SendMessages handler, impossible state"
),
+ },
+ _ => unreachable!(
+ "Expected a request message inside of an command handler,
impossible state"
+ ),
+ },
+ ShardRequestResult::Result(result) => match result? {
+ ShardResponse::PollMessages(response) => response,
+ ShardResponse::ErrorResponse(err) => {
+ return Err(err);
}
- }
+ _ => unreachable!(
+ "Expected a PollMessages response inside of PollMessages
handler, impossible state"
+ ),
+ },
};
-
// Collect all chunks first into a Vec to extend their lifetimes.
// This ensures the Bytes (in reality Arc<[u8]>) references from each
IggyMessagesBatch stay alive
// throughout the async vectored I/O operation, preventing "borrowed
value does not live
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 61e2c39c..f4497bf6 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -21,7 +21,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommandHandler};
use crate::binary::sender::SenderKind;
use crate::shard::namespace::IggyNamespace;
use crate::shard::transmission::frame::ShardResponse;
-use crate::shard::transmission::message::{ShardMessage, ShardRequest};
+use crate::shard::transmission::message::{ShardMessage, ShardRequest,
ShardRequestPayload};
use crate::shard::{IggyShard, ShardRequestResult};
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
use crate::streaming::session::Session;
@@ -150,12 +150,8 @@ impl ServerCommandHandler for SendMessages {
let batch = shard.maybe_encrypt_messages(batch)?;
let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id,
partition_id);
- let request = ShardRequest::SendMessages {
- stream_id,
- topic_id,
- partition_id,
- batch,
- };
+ let payload = ShardRequestPayload::SendMessages { batch };
+ let request = ShardRequest::new(stream.stream_id, topic.topic_id,
partition_id, payload);
let message = ShardMessage::Request(request);
// Egh... I don't like those nested match statements,
// Technically there is only two `request` types that will ever be
dispatched
@@ -164,43 +160,31 @@ impl ServerCommandHandler for SendMessages {
// how to make this code reusable, in a sense that we will have
exactly the same code inside of
// `PollMessages` handler, but with different request....
match shard.send_request_to_shard(&namespace, message).await {
- ShardRequestResult::SameShard(message) => {
- match message {
- ShardMessage::Request(request) => {
- match request {
- ShardRequest::SendMessages {
- stream_id,
- topic_id,
- partition_id,
- batch,
- } => {
- // Just shut up rust analyzer.
- let _stream_id = stream_id;
- let _topic_id = topic_id;
- topic.append_messages(partition_id,
batch).await?
- }
- _ => unreachable!(
- "Expected a SendMessages request inside of
SendMessages handler, impossible state"
- ),
+ ShardRequestResult::SameShard(message) => match message {
+ ShardMessage::Request(request) => {
+ let partition_id = request.partition_id;
+ match request.payload {
+ ShardRequestPayload::SendMessages { batch } => {
+ topic.append_messages(partition_id, batch).await?
}
+ _ => unreachable!(
+ "Expected a SendMessages request inside of
SendMessages handler, impossible state"
+ ),
}
- _ => unreachable!(
- "Expected a request message inside of an command
handler, impossible state"
- ),
}
- }
- ShardRequestResult::Result(result) => {
- match result? {
- ShardResponse::SendMessages => {
- ()
- }
- ShardResponse::ErrorResponse(err) => {
- return Err(err);
- }
- _ => unreachable!("Expected a SendMessages response inside
of SendMessages handler, impossible state"),
+ _ => unreachable!(
+ "Expected a request message inside of an command handler,
impossible state"
+ ),
+ },
+ ShardRequestResult::Result(result) => match result? {
+ ShardResponse::SendMessages => (),
+ ShardResponse::ErrorResponse(err) => {
+ return Err(err);
}
-
- }
+ _ => unreachable!(
+ "Expected a SendMessages response inside of SendMessages
handler, impossible state"
+ ),
+ },
};
sender.send_empty_ok_response().await?;
Ok(())
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index a00afde7..0e746b76 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::state::models::CreateStreamWithId;
use crate::streaming::session::Session;
@@ -46,28 +47,31 @@ impl ServerCommandHandler for CreateStream {
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
let stream_id = self.stream_id;
-
+ let name = self.name.clone();
let created_stream_id = shard
- .create_stream(session, self.stream_id, &self.name)
+ .create_stream(session, stream_id, &name)
.await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to create
stream with id: {stream_id:?}, session: {session}"
)
})?;
+ let event = ShardEvent::CreatedStream { stream_id, name };
+ // Broadcast the event to all shards.
+ let _responses = shard.broadcast_event_to_all_shards(event.into());
+
let stream = shard.find_stream(session, &created_stream_id)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to find created
stream with id: {created_stream_id:?}, session: {session}"
)
})?;
- let stream_id = stream.stream_id;
let response = mapper::map_stream(&stream);
shard
.state
.apply(session.get_user_id(),
&EntryCommand::CreateStream(CreateStreamWithId {
- stream_id,
+ stream_id: stream.stream_id,
command: self
})) .await
.with_error_context(|error| {
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index db09863e..6785e4f0 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::state::models::CreateTopicWithId;
use crate::streaming::session::Session;
@@ -62,14 +63,27 @@ impl ServerCommandHandler for CreateTopic {
.await
.with_error_context(|error| format!("{COMPONENT} (error:
{error}) - failed to create topic for stream_id: {stream_id}, topic_id:
{topic_id:?}"
))?;
+ let event = ShardEvent::CreatedTopic {
+ stream_id: stream_id.clone(),
+ topic_id,
+ name: self.name.clone(),
+ partitions_count: self.partitions_count,
+ message_expiry: self.message_expiry,
+ compression_algorithm: self.compression_algorithm,
+ max_topic_size: self.max_topic_size,
+ replication_factor: self.replication_factor,
+ };
+ // Broadcast the event to all shards.
+ let _responses = shard.broadcast_event_to_all_shards(event.into());
+
let stream = shard
- .find_stream(session, &self.stream_id)
+ .get_stream(&self.stream_id)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to get stream for
stream_id: {stream_id}"
)
})?;
- let topic = shard.find_topic(session, &stream, &created_topic_id)
+ let topic = stream.get_topic(&created_topic_id)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to get topic with
ID: {created_topic_id} in stream with ID: {stream_id}"
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs
b/core/server/src/binary/handlers/users/login_user_handler.rs
index 21cbba49..b6b163b6 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -23,6 +23,7 @@ use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
use anyhow::Result;
use error_set::ErrContext;
@@ -39,19 +40,30 @@ impl ServerCommandHandler for LoginUser {
async fn handle(
self,
sender: &mut SenderKind,
- length: u32,
+ _length: u32,
session: &Rc<Session>,
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
+ let LoginUser {
+ username, password, ..
+ } = self;
let user = shard
- .login_user(&self.username, &self.password, Some(session))
+ .login_user(&username, &password, Some(session))
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to login user with
name: {}, session: {session}",
- self.username
+ username
)
})?;
+ let event = ShardEvent::LoginUser {
+ client_id: session.client_id,
+ username,
+ password,
+ };
+ // Broadcast the event to all shards.
+ let _responses = shard.broadcast_event_to_all_shards(event.into());
+
let identity_info = mapper::map_identity_info(user.id);
sender.send_ok_response(&identity_info).await?;
Ok(())
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index ebf8946e..2d9f7eaa 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -16,6 +16,7 @@ use crate::{
streaming::{
persistence::persister::{FilePersister, FileWithSyncPersister,
PersisterKind},
users::user::User,
+ utils::file::overwrite,
},
};
use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc};
@@ -47,6 +48,12 @@ pub async fn create_directories(config: &SystemConfig) ->
Result<(), IggyError>
if !Path::new(&state_path).exists() &&
create_dir_all(&state_path).await.is_err() {
return Err(IggyError::CannotCreateStateDirectory(state_path));
}
+ let state_log = config.get_state_messages_file_path();
+ if !Path::new(&state_log).exists() {
+ if let Err(_) = overwrite(&state_log).await {
+ return Err(IggyError::CannotCreateStateDirectory(state_log));
+ }
+ }
let streams_path = config.get_streams_path();
if !Path::new(&streams_path).exists() &&
create_dir_all(&streams_path).await.is_err() {
@@ -68,10 +75,6 @@ pub async fn create_directories(config: &SystemConfig) ->
Result<(), IggyError>
config.get_system_path()
);
Ok(())
-
- // TODO: Move this to individual shard level
- /*
- */
}
pub fn create_root_user() -> User {
diff --git a/core/server/src/configs/displays.rs
b/core/server/src/configs/displays.rs
index 51873421..b65804b5 100644
--- a/core/server/src/configs/displays.rs
+++ b/core/server/src/configs/displays.rs
@@ -290,10 +290,7 @@ impl Display for SegmentConfig {
write!(
f,
"{{ size_bytes: {}, cache_indexes: {}, message_expiry: {},
archive_expired: {} }}",
- self.size,
- self.cache_indexes,
- self.message_expiry,
- self.archive_expired,
+ self.size, self.cache_indexes, self.message_expiry,
self.archive_expired,
)
}
}
diff --git a/core/server/src/io/file.rs b/core/server/src/io/file.rs
index 61e51023..95129aa8 100644
--- a/core/server/src/io/file.rs
+++ b/core/server/src/io/file.rs
@@ -94,14 +94,9 @@ impl AsyncWriteRent for IggyFile {
(Ok(n), buf)
}
+ // This is bait!!!!
async fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> BufResult<usize, T>
{
- let slice = match IoVecWrapper::new(buf_vec) {
- Ok(slice) => slice,
- Err(buf) => return (Ok(0), buf),
- };
-
- let (result, slice) = self.write(slice).await;
- (result, slice.into_inner())
+ unimplemented!("writev is not implemented for IggyFile");
}
async fn flush(&mut self) -> std::io::Result<()> {
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 436b3829..5875e6bd 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -26,6 +26,7 @@ use dotenvy::dotenv;
use error_set::ErrContext;
use figlet_rs::FIGfont;
use iggy_common::create_user::CreateUser;
+use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError};
use server::args::Args;
use server::bootstrap::{
@@ -46,6 +47,7 @@ use server::state::command::EntryCommand;
use server::state::file::FileState;
use server::state::models::CreateUserWithId;
use server::state::system::SystemState;
+use server::streaming::utils::{MemoryPool, crypto};
use server::versioning::SemanticVersion;
use server::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, map_toggle_str};
use tokio::time::Instant;
@@ -117,21 +119,27 @@ fn main() -> Result<(), ServerError> {
let mut logging = Logging::new(config.telemetry.clone());
logging.early_init();
+ // From this point on, we can use tracing macros to log messages.
+ logging.late_init(config.system.get_system_path(),
&config.system.logging)?;
+
// TODO: Make this configurable from config as a range
// for example this instance of Iggy will use cores from 0..4
let available_cpus = available_parallelism().expect("Failed to get num of
cores");
let shards_count = available_cpus.into();
let shards_set = 0..shards_count;
let connections = create_shard_connections(shards_set.clone());
+ let gate = Arc::new(Gate::new());
+ let mut handles = Vec::with_capacity(shards_set.len());
for shard_id in shards_set {
- let gate: Arc<Gate<()>> = Arc::new(Gate::new());
let id = shard_id as u16;
+ let gate = gate.clone();
let connections = connections.clone();
let config = config.clone();
let state_persister =
resolve_persister(config.system.state.enforce_fsync);
- std::thread::Builder::new()
+ let handle = std::thread::Builder::new()
.name(format!("shard-{id}"))
.spawn(move || {
+ MemoryPool::init_pool(config.system.clone());
monoio::utils::bind_to_cpu_set(Some(shard_id))
.expect(format!("Failed to set CPU affinity for
shard-{id}").as_str());
@@ -159,7 +167,6 @@ fn main() -> Result<(), ServerError> {
// We can't use std::sync::Once because it doesn't support
async.
// Trait bound on the closure is FnOnce.
- let gate = gate.clone();
// Peak into the state to check if the root user exists.
// If it does not exist, create it.
gate.with_async::<Result<(), IggyError>>(async
|gate_state| {
@@ -180,9 +187,7 @@ fn main() -> Result<(), ServerError> {
entry
.command()
.and_then(|command| match command {
- EntryCommand::CreateUser(payload)
- if payload.command.username
- == IGGY_ROOT_USERNAME_ENV &&
payload.command.password == IGGY_ROOT_PASSWORD_ENV =>
+ EntryCommand::CreateUser(payload) if
payload.user_id == DEFAULT_ROOT_USER_ID =>
{
Ok(true)
}
@@ -238,19 +243,20 @@ fn main() -> Result<(), ServerError> {
.into();
//TODO: If one of the shards fails to initialize, we
should crash the whole program;
- shard.run().await.expect("Failed to run shard");
+ if let Err(e) = shard.run().await {
+ error!("Failed to run shard-{id}: {e}");
+ }
//TODO: If one of the shards fails to initialize, we
should crash the whole program;
- shard.assert_init();
- let shard = Rc::new(shard);
+ //shard.assert_init();
})
})
- .expect(format!("Failed to spawn thread for shard-{id}").as_str())
- .join()
- .expect(format!("Failed to join thread for shard-{id}").as_str());
+ .expect(format!("Failed to spawn thread for shard-{id}").as_str());
+ handles.push(handle);
}
- // From this point on, we can use tracing macros to log messages.
- logging.late_init(config.system.get_system_path(),
&config.system.logging)?;
+ handles.into_iter().for_each(|handle| {
+ handle.join().expect("Failed to join shard thread");
+ });
/*
#[cfg(feature = "disable-mimalloc")]
@@ -260,7 +266,6 @@ fn main() -> Result<(), ServerError> {
#[cfg(not(feature = "disable-mimalloc"))]
info!("Using mimalloc allocator");
- MemoryPool::init_pool(config.system.clone());
let system = SharedSystem::new(System::new(
config.system.clone(),
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index a2a3fdfb..beda9ae2 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -113,7 +113,7 @@ impl IggyShardBuilder {
version: version,
stop_receiver: stop_receiver,
stop_sender: stop_sender,
- frame_receiver: Cell::new(Some(frame_receiver)),
+ messages_receiver: Cell::new(Some(frame_receiver)),
metrics: Metrics::init(),
users: Default::default(),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 7976cca3..cfbd6eac 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -20,13 +20,14 @@ pub mod builder;
pub mod gate;
pub mod namespace;
pub mod system;
+pub mod tasks;
pub mod transmission;
use ahash::{AHashMap, AHashSet, HashMap};
use builder::IggyShardBuilder;
use error_set::ErrContext;
use futures::future::try_join_all;
-use iggy_common::{EncryptorKind, IggyError, UserId};
+use iggy_common::{EncryptorKind, Identifier, IggyError, UserId};
use namespace::IggyNamespace;
use std::{
cell::{Cell, RefCell},
@@ -46,23 +47,29 @@ use crate::{
configs::server::ServerConfig,
shard::{
system::info::SystemInfo,
+ tasks::messages::spawn_shard_message_task,
transmission::{
+ event::ShardEvent,
frame::{ShardFrame, ShardResponse},
- message::{ShardEvent, ShardMessage, ShardRequest},
+ message::{ShardMessage, ShardRequest, ShardRequestPayload},
},
},
state::{
- file::FileState, system::{StreamState, SystemState, UserState},
StateKind
+ StateKind,
+ file::FileState,
+ system::{StreamState, SystemState, UserState},
},
streaming::{
clients::client_manager::ClientManager,
diagnostics::metrics::Metrics,
+ partitions::partition,
personal_access_tokens::personal_access_token::PersonalAccessToken,
session::Session,
storage::SystemStorage,
streams::stream::Stream,
users::{permissioner::Permissioner, user::User},
},
+ tcp::tcp_server::spawn_tcp_server,
versioning::SemanticVersion,
};
@@ -90,16 +97,15 @@ impl Shard {
.sender
.send(ShardFrame::new(message, Some(sender.clone()))); //
Apparently sender needs to be cloned, otherwise channel will close...
//TODO: Fixme
- let response = receiver.recv().await
- .map_err(|err| {
- error!("Failed to receive response from shard: {err}");
- IggyError::ShardCommunicationError(self.id)
- })?;
+ let response = receiver.recv().await.map_err(|err| {
+ error!("Failed to receive response from shard: {err}");
+ IggyError::ShardCommunicationError(self.id)
+ })?;
Ok(response)
}
}
-struct ShardInfo {
+pub struct ShardInfo {
id: u16,
}
@@ -135,7 +141,7 @@ pub struct IggyShard {
pub(crate) users: RefCell<HashMap<UserId, User>>,
pub(crate) metrics: Metrics,
- pub frame_receiver: Cell<Option<Receiver<ShardFrame>>>,
+ pub messages_receiver: Cell<Option<Receiver<ShardFrame>>>,
stop_receiver: StopReceiver,
stop_sender: StopSender,
}
@@ -172,17 +178,16 @@ impl IggyShard {
// have the correct statistics when the server starts.
//self.get_stats().await?;
self.init().await?;
- self.assert_init();
+ // TODO: Fixme
+ //self.assert_init();
info!("Initiated shard with ID: {}", self.id);
// Create all tasks (tcp listener, http listener, command processor,
in the future also the background jobs).
- /*
- let mut tasks: Vec<Task> =
vec![Box::pin(spawn_shard_message_task(shard.clone()))];
+ let mut tasks: Vec<Task> =
vec![Box::pin(spawn_shard_message_task(self.clone()))];
if self.config.tcp.enabled {
tasks.push(Box::pin(spawn_tcp_server(self.clone())));
}
let result = try_join_all(tasks).await;
result?;
- */
Ok(())
}
@@ -427,6 +432,7 @@ impl IggyShard {
#[instrument(skip_all, name = "trace_shutdown")]
pub async fn shutdown(&mut self) -> Result<(), IggyError> {
+ //TODO: Fixme, impl cooperative shutdown.
self.persist_messages().await?;
Ok(())
}
@@ -449,6 +455,81 @@ impl IggyShard {
self.shards.len() as u32
}
+ pub async fn handle_shard_message(&self, message: ShardMessage) ->
Option<ShardResponse> {
+ match message {
+ ShardMessage::Request(request) => match
self.handle_request(request).await {
+ Ok(response) => Some(response),
+ Err(err) => Some(ShardResponse::ErrorResponse(err)),
+ },
+ ShardMessage::Event(event) => match self.handle_event(event).await
{
+ Ok(_) => Some(ShardResponse::Event),
+ Err(err) => Some(ShardResponse::ErrorResponse(err)),
+ },
+ }
+ }
+
+ async fn handle_request(&self, request: ShardRequest) ->
Result<ShardResponse, IggyError> {
+ let stream =
self.get_stream(&Identifier::numeric(request.stream_id)?)?;
+ let topic = stream.get_topic(&Identifier::numeric(request.topic_id)?)?;
+ let partition_id = request.partition_id;
+ match request.payload {
+ ShardRequestPayload::SendMessages { batch } => {
+ topic.append_messages(partition_id, batch).await?;
+ Ok(ShardResponse::SendMessages)
+ }
+ ShardRequestPayload::PollMessages {
+ args,
+ consumer,
+ count,
+ } => {
+ let (metadata, batch) = topic
+ .get_messages(consumer, partition_id, args.strategy, count)
+ .await?;
+ Ok(ShardResponse::PollMessages((metadata, batch)))
+ }
+ }
+ }
+
+ async fn handle_event(&self, event: Arc<ShardEvent>) -> Result<(),
IggyError> {
+ match &*event {
+ ShardEvent::CreatedStream { stream_id, name } => {
+ self.create_stream_bypass_auth(*stream_id, name).await
+ }
+ ShardEvent::CreatedTopic {
+ stream_id,
+ topic_id,
+ name,
+ partitions_count,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ } => {
+ self.create_topic_bypass_auth(
+ stream_id,
+ *topic_id,
+ name,
+ *partitions_count,
+ *message_expiry,
+ *compression_algorithm,
+ *max_topic_size,
+ *replication_factor,
+ )
+ .await
+ }
+ ShardEvent::LoginUser {
+ client_id,
+ username,
+ password,
+ } => self.login_user_event(*client_id, username, password),
+ ShardEvent::NewSession { address, transport } => {
+ let session = self.add_client(address, *transport);
+ self.add_active_session(session);
+ Ok(())
+ }
+ }
+ }
+
pub async fn send_request_to_shard(
&self,
namespace: &IggyNamespace,
@@ -479,6 +560,25 @@ impl IggyShard {
}
}
+ pub fn broadcast_event_to_all_shards(&self, event: Arc<ShardEvent>) ->
Vec<ShardResponse> {
+ self.shards
+ .iter()
+ .filter_map(|shard| {
+ if shard.id != self.id {
+ Some(shard.connection.clone())
+ } else {
+ None
+ }
+ })
+ .map(|conn| {
+ // TODO: Fixme, maybe we should send response_sender
+ // and propagate errors back.
+ conn.send(ShardFrame::new(event.clone().into(), None));
+ ShardResponse::Event
+ })
+ .collect()
+ }
+
fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> {
let shards_table = self.shards_table.borrow();
shards_table.get(namespace).map(|shard_info| {
@@ -496,27 +596,6 @@ impl IggyShard {
self.shards_table.borrow_mut().extend(records);
}
- pub fn broadcast_event_to_all_shards(&self, client_id: u32, event:
ShardEvent) {
- self.shards
- .iter()
- .filter_map(|shard| {
- if shard.id != self.id {
- Some(shard.connection.clone())
- } else {
- None
- }
- })
- .map(|conn| {
- //TODO: Fixme
- /*
- let message = ShardMessage::Event(event);
- conn.send(ShardFrame::new(client_id, message, None));
- */
- ()
- })
- .collect::<Vec<_>>();
- }
-
pub fn add_active_session(&self, session: Rc<Session>) {
self.active_sessions.borrow_mut().push(session);
}
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 76337a1a..36ae234b 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -25,8 +25,8 @@ use crate::streaming::utils::PooledBuffer;
use async_zip::tokio::read::stream;
use error_set::ErrContext;
use iggy_common::{
- BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE,
Identifier,
- IggyError, Partitioning, PollingStrategy,
+ BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE,
Identifier, IggyError,
+ Partitioning, PollingStrategy,
};
use tracing::{error, trace};
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index fecec536..47068f11 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -186,6 +186,19 @@ impl IggyShard {
}))
}
+ pub async fn create_stream_bypass_auth(
+ &self,
+ stream_id: Option<u32>,
+ name: &str,
+ ) -> Result<(), IggyError> {
+ let stream = self.create_stream_base(stream_id, name)?;
+ let id = stream.stream_id;
+ self.streams_ids.borrow_mut().insert(name.to_owned(), id);
+ self.streams.borrow_mut().insert(id, stream);
+ self.metrics.increment_streams(1);
+ Ok(())
+ }
+
pub async fn create_stream(
&self,
session: &Session,
@@ -199,6 +212,25 @@ impl IggyShard {
if self.streams_ids.borrow().contains_key(name) {
return Err(IggyError::StreamNameAlreadyExists(name.to_owned()));
}
+ let stream = self
+ .create_stream_base(stream_id, name)
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to create
stream with name: {name}")
+ })?;
+ let id = stream.stream_id;
+
+ stream.persist().await?;
+ info!("Created stream with ID: {id}, name: '{name}'.");
+ self.streams_ids.borrow_mut().insert(name.to_owned(), id);
+ self.streams.borrow_mut().insert(id, stream);
+ self.metrics.increment_streams(1);
+ Ok(Identifier::numeric(id)?)
+ }
+
+ fn create_stream_base(&self, stream_id: Option<u32>, name: &str) ->
Result<Stream, IggyError> {
+ if self.streams_ids.borrow().contains_key(name) {
+ return Err(IggyError::StreamNameAlreadyExists(name.to_owned()));
+ }
let mut id;
if stream_id.is_none() {
@@ -222,14 +254,7 @@ impl IggyShard {
}
let stream = Stream::create(id, name, self.config.system.clone(),
self.storage.clone());
- stream.persist().await?;
- info!("Created stream with ID: {id}, name: '{name}'.");
- self.streams_ids
- .borrow_mut()
- .insert(name.to_owned(), stream.stream_id);
- self.streams.borrow_mut().insert(stream.stream_id, stream);
- self.metrics.increment_streams(1);
- Ok(Identifier::numeric(id)?)
+ Ok(stream)
}
pub async fn update_stream(
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index 252dae30..c247cf74 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -101,6 +101,52 @@ impl IggyShard {
Ok(Some(topic))
}
+ pub async fn create_topic_bypass_auth(
+ &self,
+ stream_id: &Identifier,
+ topic_id: Option<u32>,
+ name: &str,
+ partitions_count: u32,
+ message_expiry: IggyExpiry,
+ compression_algorithm: CompressionAlgorithm,
+ max_topic_size: MaxTopicSize,
+ replication_factor: Option<u8>,
+ ) -> Result<(), IggyError> {
+ let (stream_id, topic_id, partition_ids) = self
+ .create_topic_base(
+ stream_id,
+ topic_id,
+ name,
+ partitions_count,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ )
+ .await?;
+ // TODO: Figure out a way how to distribute the shards table among
different shards,
+ // without the need to do code from below, everytime we handle a
`ShardEvent`.
+ // I think we shouldn't be sharing it, maintain a single shard table
per shard,
+ // but figure out a way how to distribute it smarter (maybe move the
broadcast inside of the `insert_shard_table_records`) method.
+ let records = partition_ids.into_iter().map(|partition_id| {
+ let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
+ // TODO: This setup isn't deterministic.
+ // Imagine a scenario where client creates partition using
`String` identifiers,
+ // but then for poll_messages requests uses numeric ones.
+ // the namespace wouldn't match, therefore we would get miss in
the shard table.
+ let hash = namespace.generate_hash();
+ let shard_id = hash % self.get_available_shards_count();
+ let shard_info = ShardInfo::new(shard_id as u16);
+ (namespace, shard_info)
+ });
+ self.insert_shard_table_records(records);
+
+ self.metrics.increment_topics(1);
+ self.metrics.increment_partitions(partitions_count);
+ self.metrics.increment_segments(partitions_count);
+ Ok(())
+ }
+
#[allow(clippy::too_many_arguments)]
pub async fn create_topic(
&self,
@@ -130,26 +176,18 @@ impl IggyShard {
})?;
}
- // TODO: Make create topic sync, and extract the storage persister out
of it
- // perform disk i/o outside of the borrow_mut of the stream.
- let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to get mutable
reference to stream with ID: {stream_id}")
- })?;
- let stream_id = stream.stream_id;
- let (topic_id, partition_ids) = stream
- .create_topic(
+ let (stream_id, topic_id, partition_ids) = self
+ .create_topic_base(
+ stream_id,
topic_id,
name,
partitions_count,
message_expiry,
compression_algorithm,
max_topic_size,
- replication_factor.unwrap_or(1),
+ replication_factor,
)
- .await
- .with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to create topic
with name: {name} in stream ID: {stream_id}")
- })?;
+ .await?;
let records = partition_ids.into_iter().map(|partition_id| {
let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
// TODO: This setup isn't deterministic.
@@ -170,6 +208,40 @@ impl IggyShard {
Ok(Identifier::numeric(topic_id)?)
}
+ async fn create_topic_base(
+ &self,
+ stream_id: &Identifier,
+ topic_id: Option<u32>,
+ name: &str,
+ partitions_count: u32,
+ message_expiry: IggyExpiry,
+ compression_algorithm: CompressionAlgorithm,
+ max_topic_size: MaxTopicSize,
+ replication_factor: Option<u8>,
+ ) -> Result<(u32, u32, Vec<u32>), IggyError> {
+ // TODO: Make create topic sync, and extract the storage persister out
of it
+ // perform disk i/o outside of the borrow_mut of the stream.
+ let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get mutable
reference to stream with ID: {stream_id}")
+ })?;
+ let stream_id = stream.stream_id;
+ let (topic_id, partition_ids) = stream
+ .create_topic(
+ topic_id,
+ name,
+ partitions_count,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor.unwrap_or(1),
+ )
+ .await
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to create topic
with name: {name} in stream ID: {stream_id}")
+ })?;
+ Ok((stream_id, topic_id, partition_ids))
+ }
+
#[allow(clippy::too_many_arguments)]
pub async fn update_topic(
&self,
diff --git a/core/server/src/shard/system/users.rs
b/core/server/src/shard/system/users.rs
index f3b271a7..f0e87fc0 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -372,6 +372,21 @@ impl IggyShard {
Ok(())
}
+ pub fn login_user_event(
+ &self,
+ client_id: u32,
+ username: &str,
+ password: &str,
+ ) -> Result<(), IggyError> {
+ let active_sessions = self.active_sessions.borrow();
+ let session = active_sessions
+ .iter()
+ .find(|s| s.client_id == client_id)
+ .expect(format!("At this point session for {}, should exist.",
client_id).as_str());
+ self.login_user_with_credentials(username, Some(password),
Some(session))?;
+ Ok(())
+ }
+
pub fn login_user(
&self,
username: &str,
diff --git a/core/server/src/shard/tasks/messages.rs
b/core/server/src/shard/tasks/messages.rs
new file mode 100644
index 00000000..87660290
--- /dev/null
+++ b/core/server/src/shard/tasks/messages.rs
@@ -0,0 +1,38 @@
+use futures::StreamExt;
+use iggy_common::IggyError;
+use std::rc::Rc;
+use tracing::error;
+
+use crate::shard::{IggyShard, transmission::frame::ShardFrame};
+
+async fn run_shard_messages_receiver(shard: Rc<IggyShard>) -> Result<(),
IggyError> {
+ let mut messages_receiver = shard.messages_receiver.take().unwrap();
+ loop {
+ if let Some(frame) = messages_receiver.next().await {
+ let ShardFrame {
+ message,
+ response_sender,
+ } = frame;
+ match (shard.handle_shard_message(message).await, response_sender)
{
+ (Some(response), Some(response_sender)) => {
+ response_sender
+ .send(response)
+ .await
+ .expect("Failed to send response back to origin
shard.");
+ }
+ _ => {}
+ };
+ }
+ }
+}
+
+pub async fn spawn_shard_message_task(shard: Rc<IggyShard>) -> Result<(),
IggyError> {
+ monoio::spawn(async move {
+ let result = run_shard_messages_receiver(shard).await;
+ if let Err(err) = &result {
+ error!("Error running shard: {err}");
+ }
+ result
+ })
+ .await
+}
diff --git a/core/server/src/shard/tasks/mod.rs
b/core/server/src/shard/tasks/mod.rs
new file mode 100644
index 00000000..ba63992f
--- /dev/null
+++ b/core/server/src/shard/tasks/mod.rs
@@ -0,0 +1 @@
+pub mod messages;
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index 45815a50..49794c3f 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -4,26 +4,27 @@ use iggy_common::{CompressionAlgorithm, Identifier,
IggyExpiry, MaxTopicSize};
use crate::streaming::clients::client_manager::Transport;
+#[derive(Debug)]
pub enum ShardEvent {
CreatedStream {
stream_id: Option<u32>,
- topic_id: Identifier
+ name: String,
},
//DeletedStream(Identifier),
//UpdatedStream(Identifier, String),
//PurgedStream(Identifier),
//CreatedPartitions(Identifier, Identifier, u32),
//DeletedPartitions(Identifier, Identifier, u32),
- CreatedTopic(
- Identifier,
- Option<u32>,
- String,
- u32,
- IggyExpiry,
- CompressionAlgorithm,
- MaxTopicSize,
- Option<u8>,
- ),
+ CreatedTopic {
+ stream_id: Identifier,
+ topic_id: Option<u32>,
+ name: String,
+ partitions_count: u32,
+ message_expiry: IggyExpiry,
+ compression_algorithm: CompressionAlgorithm,
+ max_topic_size: MaxTopicSize,
+ replication_factor: Option<u8>,
+ },
//CreatedConsumerGroup(Identifier, Identifier, Option<u32>, String),
//DeletedConsumerGroup(Identifier, Identifier, Identifier),
/*
@@ -42,6 +43,7 @@ pub enum ShardEvent {
//CreatedUser(String, String, UserStatus, Option<Permissions>),
//DeletedUser(Identifier),
LoginUser {
+ client_id: u32,
username: String,
password: String,
},
@@ -53,8 +55,7 @@ pub enum ShardEvent {
//LoginWithPersonalAccessToken(String),
//StoredConsumerOffset(Identifier, Identifier, PollingConsumer, u64),
NewSession {
- user_id: u32,
- socket_addr: SocketAddr,
+ address: SocketAddr,
transport: Transport,
},
-}
\ No newline at end of file
+}
diff --git a/core/server/src/shard/transmission/frame.rs
b/core/server/src/shard/transmission/frame.rs
index ff519ab4..6539ec4c 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -18,13 +18,16 @@
use async_channel::Sender;
use iggy_common::IggyError;
-use
crate::{binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
shard::transmission::message::ShardMessage,
streaming::segments::IggyMessagesBatchSet};
+use crate::{
+ binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
+ shard::transmission::message::ShardMessage,
streaming::segments::IggyMessagesBatchSet,
+};
#[derive(Debug)]
pub enum ShardResponse {
PollMessages((IggyPollMetadata, IggyMessagesBatchSet)),
SendMessages,
- ShardEvent,
+ Event,
ErrorResponse(IggyError),
}
diff --git a/core/server/src/shard/transmission/message.rs
b/core/server/src/shard/transmission/message.rs
index 360e7a3c..aa196fde 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -1,4 +1,4 @@
-use std::rc::Rc;
+use std::{rc::Rc, sync::Arc};
use iggy_common::PollingStrategy;
@@ -19,29 +19,47 @@ use iggy_common::PollingStrategy;
* specific language governing permissions and limitations
* under the License.
*/
-use crate::{shard::system::messages::PollingArgs,
streaming::{polling_consumer::PollingConsumer, segments::IggyMessagesBatchMut,
session::Session}};
+use crate::{
+ shard::{system::messages::PollingArgs, transmission::event::ShardEvent},
+ streaming::{polling_consumer::PollingConsumer,
segments::IggyMessagesBatchMut},
+};
#[derive(Debug)]
pub enum ShardMessage {
Request(ShardRequest),
- Event(ShardEvent),
+ Event(Arc<ShardEvent>),
}
#[derive(Debug)]
-pub enum ShardEvent {
- NewSession(),
+pub struct ShardRequest {
+ pub stream_id: u32,
+ pub topic_id: u32,
+ pub partition_id: u32,
+ pub payload: ShardRequestPayload,
}
-#[derive(Debug)]
-pub enum ShardRequest {
- SendMessages {
+impl ShardRequest {
+ pub fn new(
stream_id: u32,
topic_id: u32,
partition_id: u32,
+ payload: ShardRequestPayload,
+ ) -> Self {
+ Self {
+ stream_id,
+ topic_id,
+ partition_id,
+ payload,
+ }
+ }
+}
+
+#[derive(Debug)]
+pub enum ShardRequestPayload {
+ SendMessages {
batch: IggyMessagesBatchMut,
},
PollMessages {
- partition_id: u32,
args: PollingArgs,
consumer: PollingConsumer,
count: u32,
@@ -54,8 +72,8 @@ impl From<ShardRequest> for ShardMessage {
}
}
-impl From<ShardEvent> for ShardMessage {
- fn from(event: ShardEvent) -> Self {
+impl From<Arc<ShardEvent>> for ShardMessage {
+ fn from(event: Arc<ShardEvent>) -> Self {
ShardMessage::Event(event)
}
}
diff --git a/core/server/src/shard/transmission/mod.rs
b/core/server/src/shard/transmission/mod.rs
index 4ed9bd1e..107bdcbd 100644
--- a/core/server/src/shard/transmission/mod.rs
+++ b/core/server/src/shard/transmission/mod.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-pub mod event;
pub mod connector;
+pub mod event;
pub mod frame;
pub mod message;
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index b845d877..e8eb6ee6 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -117,7 +117,6 @@ impl State for FileState {
)
})
.map_err(|_| IggyError::CannotReadFile)?;
- let file = IggyFile::new(file);
let file_size = file
.metadata()
.await
@@ -129,6 +128,8 @@ impl State for FileState {
})
.map_err(|_| IggyError::CannotReadFileMetadata)?
.len();
+
+ let file = IggyFile::new(file);
if file_size == 0 {
info!("State file is empty");
return Ok(Vec::new());
@@ -150,7 +151,8 @@ impl State for FileState {
.with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR}
index. {error}"))
.map_err(|_| IggyError::InvalidNumberEncoding)?;
total_size += 8;
- if entries_count > 0 && index != current_index + 1 {
+ // Greater than one, because one of the entries after a fresh
reboot is the default root user.
+ if entries_count > 1 && index != current_index + 1 {
error!(
"State file is corrupted, expected index: {}, got: {}",
current_index + 1,
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs
b/core/server/src/streaming/segments/indexes/index_reader.rs
index c373ce0f..240e62a1 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -17,7 +17,7 @@
*/
use super::IggyIndexesMut;
-use crate::streaming::utils::PooledBuffer;
+use crate::{io::file::IggyFile, streaming::utils::PooledBuffer};
use bytes::BytesMut;
use error_set::ErrContext;
use iggy_common::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView};
@@ -30,15 +30,14 @@ use std::{
atomic::{AtomicU64, Ordering},
},
};
-use tokio::fs::OpenOptions;
-use tokio::task::spawn_blocking;
+use monoio::fs::OpenOptions;
use tracing::{error, trace};
/// A dedicated struct for reading from the index file.
#[derive(Debug)]
pub struct IndexReader {
file_path: String,
- file: Arc<StdFile>,
+ file: IggyFile,
index_size_bytes: Arc<AtomicU64>,
}
@@ -52,13 +51,14 @@ impl IndexReader {
.with_error_context(|error| format!("Failed to open index file:
{file_path}. {error}"))
.map_err(|_| IggyError::CannotReadFile)?;
+ let file = IggyFile::new(file);
trace!(
"Opened index file for reading: {file_path}, size: {}",
index_size_bytes.load(Ordering::Acquire)
);
Ok(Self {
file_path: file_path.to_string(),
- file: Arc::new(file.into_std().await),
+ file,
index_size_bytes,
})
}
@@ -334,21 +334,19 @@ impl IndexReader {
len: u32,
use_pool: bool,
) -> Result<PooledBuffer, std::io::Error> {
- let file = self.file.clone();
- spawn_blocking(move || {
if use_pool {
let mut buf = PooledBuffer::with_capacity(len as usize);
unsafe { buf.set_len(len as usize) };
- file.read_exact_at(&mut buf, offset as u64)?;
+ let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
+ result?;
Ok(buf)
} else {
let mut buf = BytesMut::with_capacity(len as usize);
unsafe { buf.set_len(len as usize) };
- file.read_exact_at(&mut buf, offset as u64)?;
+ let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await;
+ result?;
Ok(PooledBuffer::from_existing(buf))
}
- })
- .await?
}
/// Gets the nth index from the index file.
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs
b/core/server/src/streaming/segments/indexes/index_writer.rs
index 71b18b6d..120bfe4e 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -19,21 +19,22 @@
use error_set::ErrContext;
use iggy_common::INDEX_SIZE;
use iggy_common::IggyError;
+use monoio::fs::OpenOptions;
+use monoio::io::AsyncWriteRentExt;
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
-use tokio::{
- fs::{File, OpenOptions},
- io::AsyncWriteExt,
-};
use tracing::trace;
+use crate::io::file::IggyFile;
+use crate::streaming::utils::PooledBuffer;
+
/// A dedicated struct for writing to the index file.
#[derive(Debug)]
pub struct IndexWriter {
file_path: String,
- file: File,
+ file: IggyFile,
index_size_bytes: Arc<AtomicU64>,
fsync: bool,
}
@@ -55,6 +56,7 @@ impl IndexWriter {
.with_error_context(|error| format!("Failed to open index file:
{file_path}. {error}"))
.map_err(|_| IggyError::CannotReadFile)?;
+ let file = IggyFile::from(file);
if file_exists {
let _ = file.sync_all().await.with_error_context(|error| {
format!("Failed to fsync index file after creation:
{file_path}. {error}",)
@@ -86,16 +88,18 @@ impl IndexWriter {
}
/// Appends multiple index buffer to the index file in a single operation.
- pub async fn save_indexes(&mut self, indexes: &[u8]) -> Result<(),
IggyError> {
+ pub async fn save_indexes(&mut self, indexes: PooledBuffer) -> Result<(),
IggyError> {
if indexes.is_empty() {
return Ok(());
}
let count = indexes.len() / INDEX_SIZE;
+ let len = indexes.len();
self.file
.write_all(indexes)
.await
+ .0
.with_error_context(|error| {
format!(
"Failed to write {} indexes to file: {}. {error}",
@@ -105,7 +109,7 @@ impl IndexWriter {
.map_err(|_| IggyError::CannotSaveIndexToSegment)?;
self.index_size_bytes
- .fetch_add(indexes.len() as u64, Ordering::Release);
+ .fetch_add(len as u64, Ordering::Release);
if self.fsync {
let _ = self.fsync().await;
diff --git a/core/server/src/streaming/segments/indexes/indexes_mut.rs
b/core/server/src/streaming/segments/indexes/indexes_mut.rs
index 0dacb907..1b5e8e1e 100644
--- a/core/server/src/streaming/segments/indexes/indexes_mut.rs
+++ b/core/server/src/streaming/segments/indexes/indexes_mut.rs
@@ -221,9 +221,12 @@ impl IggyIndexesMut {
}
/// Gets the unsaved part of the index buffer
- pub fn unsaved_slice(&self) -> &[u8] {
+ pub fn unsaved_slice(&self) -> PooledBuffer {
let start_pos = self.saved_count as usize * INDEX_SIZE;
- &self.buffer[start_pos..]
+ // TODO: Dunno how to handle this better, maybe we should have a
`split` method,
+ // That splits the underlying Indexes buffer into two parts
+ // saved on disk and not saved yet.
+ PooledBuffer::from(&self.buffer[start_pos..])
}
/// Mark all indexes as saved to disk
diff --git a/core/server/src/streaming/segments/messages/mod.rs
b/core/server/src/streaming/segments/messages/mod.rs
index 512c3894..b79b1af0 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -19,71 +19,35 @@
mod messages_reader;
mod messages_writer;
-use crate::{io::file::IggyFile, to_iovec};
+use crate::{io::file::IggyFile};
use super::IggyMessagesBatchSet;
use error_set::ErrContext;
use iggy_common::IggyError;
-use monoio::io::AsyncWriteRent;
-use std::{io::IoSlice, mem::take};
+use monoio::io::AsyncWriteRentExt;
pub use messages_reader::MessagesReader;
pub use messages_writer::MessagesWriter;
-use nix::libc::iovec;
/// Vectored write a batches of messages to file
async fn write_batch(
file: &mut IggyFile,
file_path: &str,
- batches: IggyMessagesBatchSet,
+ mut batches: IggyMessagesBatchSet,
) -> Result<usize, IggyError> {
- let mut slices = batches.iter().map(|b|
to_iovec(&b)).collect::<Vec<iovec>>();
+ //let mut slices = batches.iter().map(|b|
to_iovec(&b)).collect::<Vec<iovec>>();
let mut total_written = 0;
-
- loop {
- let (result, vomited) = file.writev(slices).await;
- slices = vomited;
- let bytes_written = result
- .with_error_context(|error| {
- format!("Failed to write messages to file: {file_path}, error:
{error}",)
- })
- .map_err(|_| IggyError::CannotWriteToFile)?;
-
- total_written += bytes_written;
- advance_slices(&mut slices.as_mut_slice(), bytes_written);
- if slices.is_empty() {
- break;
- }
+ // TODO: Fork monoio, piece of shit runtime.
+ for batch in batches.iter_mut() {
+ let messages = batch.take_messages();
+ let writen =
file.write_all(messages).await.0.with_error_context(|error| {
+ format!(
+ "Failed to write messages to file: {file_path}, error:
{error}",
+ )
+ // TODO: Better error variant.
+ }).map_err(|_| IggyError::CannotAppendMessage)?;
+ total_written += writen;
}
-
Ok(total_written)
}
-
-fn advance_slices(mut bufs: &mut [iovec], n: usize) {
- // Number of buffers to remove.
- let mut remove = 0;
- // Remaining length before reaching n. This prevents overflow
- // that could happen if the length of slices in `bufs` were instead
- // accumulated. Those slice may be aliased and, if they are large
- // enough, their added length may overflow a `usize`.
- let mut left = n;
- for buf in bufs.iter() {
- if let Some(remainder) = left.checked_sub(buf.iov_len as _) {
- left = remainder;
- remove += 1;
- } else {
- break;
- }
- }
-
- bufs = &mut bufs[remove..];
- if bufs.is_empty() {
- assert!(left == 0, "advancing io slices beyond their length");
- } else {
- unsafe {
- bufs[0].iov_len -= n;
- bufs[0].iov_base = bufs[0].iov_base.add(n);
- }
- }
-}
diff --git a/core/server/src/streaming/segments/segment.rs
b/core/server/src/streaming/segments/segment.rs
index 92dd254f..9887df90 100644
--- a/core/server/src/streaming/segments/segment.rs
+++ b/core/server/src/streaming/segments/segment.rs
@@ -313,7 +313,8 @@ impl Segment {
}
if let Some(index_writer) = self.index_writer.take() {
- tokio::spawn(async move {
+ //TODO: Fixme not sure whether we should spawn a task here.
+ monoio::spawn(async move {
let _ = index_writer.fsync().await;
drop(index_writer)
});
diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs
b/core/server/src/streaming/segments/types/messages_batch_mut.rs
index ea549291..6a76b582 100644
--- a/core/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -264,6 +264,10 @@ impl IggyMessagesBatchMut {
(indexes, messages)
}
+ pub fn take_messages(&mut self) -> PooledBuffer {
+ std::mem::take(&mut self.messages)
+ }
+
/// Take the indexes from the batch
pub fn take_indexes(&mut self) -> IggyIndexesMut {
std::mem::take(&mut self.indexes)
diff --git a/core/server/src/streaming/segments/writing_messages.rs
b/core/server/src/streaming/segments/writing_messages.rs
index 9c385b02..8313f589 100644
--- a/core/server/src/streaming/segments/writing_messages.rs
+++ b/core/server/src/streaming/segments/writing_messages.rs
@@ -99,6 +99,7 @@ impl Segment {
self.last_index_position += saved_bytes.as_bytes_u64() as u32;
let unsaved_indexes_slice = self.indexes.unsaved_slice();
+ let len = unsaved_indexes_slice.len();
self.index_writer
.as_mut()
.expect("Index writer not initialized")
@@ -107,7 +108,7 @@ impl Segment {
.with_error_context(|error| {
format!(
"Failed to save index of {} indexes to {self}. {error}",
- unsaved_indexes_slice.len()
+ len
)
})?;
diff --git a/core/server/src/streaming/streams/storage.rs
b/core/server/src/streaming/streams/storage.rs
index ebd4946a..5bf4c554 100644
--- a/core/server/src/streaming/streams/storage.rs
+++ b/core/server/src/streaming/streams/storage.rs
@@ -20,7 +20,6 @@ use crate::state::system::StreamState;
use crate::streaming::storage::StreamStorage;
use crate::streaming::streams::COMPONENT;
use crate::streaming::streams::stream::Stream;
-use crate::streaming::topics::topic::CreatedTopicInfo;
use crate::streaming::topics::topic::Topic;
use ahash::AHashSet;
use error_set::ErrContext;
@@ -28,11 +27,11 @@ use futures::future::join_all;
use iggy_common::IggyError;
use iggy_common::IggyTimestamp;
use serde::{Deserialize, Serialize};
+use tokio::sync::Mutex;
use std::path::Path;
use std::sync::Arc;
-use tokio::fs;
-use tokio::fs::create_dir_all;
-use tokio::sync::Mutex;
+use monoio::fs;
+use monoio::fs::create_dir_all;
use tracing::{error, info, warn};
#[derive(Debug)]
@@ -52,13 +51,13 @@ impl StreamStorage for FileStreamStorage {
}
let mut unloaded_topics = Vec::new();
- let dir_entries = fs::read_dir(&stream.topics_path).await;
+ let dir_entries = std::fs::read_dir(&stream.topics_path);
if dir_entries.is_err() {
return Err(IggyError::CannotReadTopics(stream.stream_id));
}
let mut dir_entries = dir_entries.unwrap();
- while let Some(dir_entry) =
dir_entries.next_entry().await.unwrap_or(None) {
+ while let Some(dir_entry) =
dir_entries.next().transpose().unwrap_or(None) {
let name = dir_entry.file_name().into_string().unwrap();
let topic_id = name.parse::<u32>();
if topic_id.is_err() {
@@ -73,7 +72,8 @@ impl StreamStorage for FileStreamStorage {
error!(
"Topic with ID: '{topic_id}' for stream with ID:
'{stream_id}' was not found in state, but exists on disk and will be removed."
);
- if let Err(error) =
fs::remove_dir_all(&dir_entry.path()).await {
+ // TODO: Replace this with the dir walk impl that is mentioed
in main function.
+ if let Err(error) = std::fs::remove_dir_all(&dir_entry.path())
{
error!("Cannot remove topic directory: {error}");
} else {
warn!(
@@ -151,6 +151,7 @@ impl StreamStorage for FileStreamStorage {
}
}
+ // TODO: Refactor....
let loaded_topics = Arc::new(Mutex::new(Vec::new()));
let mut load_topics = Vec::new();
for mut topic in unloaded_topics {
@@ -222,7 +223,7 @@ impl StreamStorage for FileStreamStorage {
async fn delete(&self, stream: &Stream) -> Result<(), IggyError> {
info!("Deleting stream with ID: {}...", stream.stream_id);
- if fs::remove_dir_all(&stream.path).await.is_err() {
+ if std::fs::remove_dir_all(&stream.path).is_err() {
return
Err(IggyError::CannotDeleteStreamDirectory(stream.stream_id));
}
info!("Deleted stream with ID: {}.", stream.stream_id);
diff --git a/core/server/src/streaming/topics/messages.rs
b/core/server/src/streaming/topics/messages.rs
index 2f8f0557..f56ed560 100644
--- a/core/server/src/streaming/topics/messages.rs
+++ b/core/server/src/streaming/topics/messages.rs
@@ -25,8 +25,8 @@ use crate::streaming::utils::hash;
use ahash::AHashMap;
use error_set::ErrContext;
use iggy_common::locking::IggySharedMutFn;
-use iggy_common::{IggyTimestamp, PollingStrategy};
use iggy_common::{IggyError, IggyExpiry, Partitioning, PartitioningKind,
PollingKind};
+use iggy_common::{IggyTimestamp, PollingStrategy};
use std::sync::atomic::Ordering;
use tracing::trace;
diff --git a/core/server/src/streaming/topics/storage.rs
b/core/server/src/streaming/topics/storage.rs
index 827ba6c9..1025dbd3 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -29,11 +29,11 @@ use futures::future::join_all;
use iggy_common::IggyError;
use iggy_common::locking::IggyRwLock;
use iggy_common::locking::IggySharedMutFn;
+use monoio::fs::create_dir_all;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::sync::Arc;
-use tokio::fs;
-use tokio::fs::create_dir_all;
+use monoio::fs;
use tokio::sync::Mutex;
use tracing::{error, info, warn};
@@ -61,14 +61,14 @@ impl TopicStorage for FileTopicStorage {
topic.compression_algorithm = state.compression_algorithm;
topic.replication_factor = state.replication_factor.unwrap_or(1);
- let mut dir_entries = fs::read_dir(&topic.partitions_path).await
+ let mut dir_entries = std::fs::read_dir(&topic.partitions_path)
.with_context(|| format!("Failed to read partition with ID: {} for
stream with ID: {} for topic with ID: {} and path: {}",
topic.topic_id, topic.stream_id,
topic.topic_id, &topic.partitions_path))
.map_err(|_| IggyError::CannotReadPartitions)?;
let mut unloaded_partitions = Vec::new();
- while let Some(dir_entry) =
dir_entries.next_entry().await.unwrap_or(None) {
- let metadata = dir_entry.metadata().await;
+ while let Some(dir_entry) =
dir_entries.next().transpose().unwrap_or(None) {
+ let metadata = dir_entry.metadata();
if metadata.is_err() || metadata.unwrap().is_file() {
continue;
}
@@ -88,7 +88,8 @@ impl TopicStorage for FileTopicStorage {
error!(
"Partition with ID: '{partition_id}' for stream with ID:
'{stream_id}' and topic with ID: '{topic_id}' was not found in state, but
exists on disk and will be removed."
);
- if let Err(error) =
fs::remove_dir_all(&dir_entry.path()).await {
+ // TODO: Replace this with the dir walk impl that is mentioed
in main function.
+ if let Err(error) = std::fs::remove_dir_all(&dir_entry.path())
{
error!("Cannot remove partition directory: {error}");
} else {
warn!(
@@ -269,7 +270,7 @@ impl TopicStorage for FileTopicStorage {
async fn delete(&self, topic: &Topic) -> Result<(), IggyError> {
info!("Deleting topic {topic}...");
- if fs::remove_dir_all(&topic.path).await.is_err() {
+ if std::fs::remove_dir_all(&topic.path).is_err() {
return Err(IggyError::CannotDeleteTopicDirectory(
topic.topic_id,
topic.stream_id,
diff --git a/core/server/src/streaming/utils/memory_pool.rs
b/core/server/src/streaming/utils/memory_pool.rs
index 198e8626..16ad2087 100644
--- a/core/server/src/streaming/utils/memory_pool.rs
+++ b/core/server/src/streaming/utils/memory_pool.rs
@@ -155,7 +155,8 @@ impl MemoryPool {
/// Initialize the global pool from the given config.
pub fn init_pool(config: Arc<SystemConfig>) {
- let is_enabled = config.memory_pool.enabled;
+ // TODO: Fixme (stary napraw).
+ let is_enabled = false;
let memory_limit = config.memory_pool.size.as_bytes_usize();
let bucket_capacity = config.memory_pool.bucket_capacity as usize;
diff --git a/core/server/src/streaming/utils/pooled_buffer.rs
b/core/server/src/streaming/utils/pooled_buffer.rs
index 0aa1dd3a..a8664325 100644
--- a/core/server/src/streaming/utils/pooled_buffer.rs
+++ b/core/server/src/streaming/utils/pooled_buffer.rs
@@ -18,7 +18,7 @@
use super::memory_pool::{BytesMutExt, memory_pool};
use bytes::{Buf, BufMut, BytesMut};
-use monoio::buf::IoBufMut;
+use monoio::buf::{IoBuf, IoBufMut};
use std::ops::{Deref, DerefMut};
#[derive(Debug)]
@@ -230,3 +230,13 @@ unsafe impl IoBufMut for PooledBuffer {
unsafe { self.inner.set_init(pos) }
}
}
+
+unsafe impl IoBuf for PooledBuffer {
+ fn read_ptr(&self) -> *const u8 {
+ self.inner.read_ptr()
+ }
+
+ fn bytes_init(&self) -> usize {
+ self.inner.bytes_init()
+ }
+}
diff --git a/core/server/src/tcp/connection_handler.rs
b/core/server/src/tcp/connection_handler.rs
index 9c92d27d..5622d2a8 100644
--- a/core/server/src/tcp/connection_handler.rs
+++ b/core/server/src/tcp/connection_handler.rs
@@ -35,8 +35,14 @@ pub(crate) async fn handle_connection(
sender: &mut SenderKind,
shard: &Rc<IggyShard>,
) -> Result<(), ConnectionError> {
- let length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
- let code_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
+ let mut length_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
+ unsafe {
+ length_buffer.set_len(INITIAL_BYTES_LENGTH);
+ }
+ let mut code_buffer = BytesMut::with_capacity(INITIAL_BYTES_LENGTH);
+ unsafe {
+ code_buffer.set_len(INITIAL_BYTES_LENGTH);
+ }
loop {
let (read_length, initial_buffer) = match
sender.read(length_buffer.clone()).await {
(Ok(read_length), initial_buffer) => (read_length, initial_buffer),
diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs
index 2af9f494..2f445d4f 100644
--- a/core/server/src/tcp/sender.rs
+++ b/core/server/src/tcp/sender.rs
@@ -119,14 +119,14 @@ where
status
);
let prefix = [
- libc::iovec {
- iov_base: length.as_ptr() as _,
- iov_len: length.len(),
- },
libc::iovec {
iov_base: status.as_ptr() as _,
iov_len: status.len(),
},
+ libc::iovec {
+ iov_base: length.as_ptr() as _,
+ iov_len: length.len(),
+ },
];
slices.splice(0..0, prefix);
stream
diff --git a/core/server/src/tcp/tcp_listener.rs
b/core/server/src/tcp/tcp_listener.rs
index ac29e2b9..c0080edf 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -18,15 +18,16 @@
use crate::binary::sender::SenderKind;
use crate::shard::IggyShard;
-use crate::shard::transmission::message::ShardEvent;
+use crate::shard::transmission::event::ShardEvent;
use crate::streaming::clients::client_manager::Transport;
use crate::tcp::connection_handler::{handle_connection, handle_error};
use crate::tcp::tcp_socket;
+use iggy_common::IggyError;
use std::net::SocketAddr;
use std::rc::Rc;
use tracing::{error, info};
-pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) {
+pub async fn start(server_name: &'static str, shard: Rc<IggyShard>) ->
Result<(), IggyError> {
let ip_v6 = shard.config.tcp.ipv6;
let socket_config = &shard.config.tcp.socket;
let addr: SocketAddr = shard
@@ -40,8 +41,8 @@ pub async fn start(server_name: &'static str, shard:
Rc<IggyShard>) {
monoio::spawn(async move {
socket
.bind(&addr.into())
- .expect("Failed to bind eTCP listener");
- socket.listen(1024);
+ .expect("Failed to bind TCP listener");
+ socket.listen(1024).unwrap();
let listener: std::net::TcpListener = socket.into();
let listener = monoio::net::TcpListener::from_std(listener).unwrap();
info!("{server_name} server has started on: {:?}", addr);
@@ -50,15 +51,14 @@ pub async fn start(server_name: &'static str, shard:
Rc<IggyShard>) {
Ok((stream, address)) => {
let shard = shard.clone();
info!("Accepted new TCP connection: {address}");
- let session = shard.add_client(&address, Transport::Tcp);
+ let transport = Transport::Tcp;
+ let session = shard.add_client(&address, transport);
//TODO: Those can be shared with other shards.
shard.add_active_session(session.clone());
// Broadcast session to all shards.
- //TODO: Fixme
- /*
- let event =
Rc::new(ShardEvent::NewSession(session.clone()));
- shard.broadcast_event_to_all_shards(session.client_id,
event);
- */
+ let event = ShardEvent::NewSession { address, transport };
+ // TODO: Fixme look inside of
broadcast_event_to_all_shards method.
+ let _responses =
shard.broadcast_event_to_all_shards(event.into());
let _client_id = session.client_id;
info!("Created new session: {session}");
@@ -85,5 +85,6 @@ pub async fn start(server_name: &'static str, shard:
Rc<IggyShard>) {
Err(error) => error!("Unable to accept TCP socket. {error}"),
}
}
- });
+ })
+ .await
}
diff --git a/core/server/src/tcp/tcp_server.rs
b/core/server/src/tcp/tcp_server.rs
index 9b8e1c04..0430f773 100644
--- a/core/server/src/tcp/tcp_server.rs
+++ b/core/server/src/tcp/tcp_server.rs
@@ -24,17 +24,18 @@ use tracing::info;
/// Starts the TCP server.
/// Returns the address the server is listening on.
-pub async fn start(shard: Rc<IggyShard>) -> Result<(), IggyError> {
+pub async fn spawn_tcp_server(shard: Rc<IggyShard>) -> Result<(), IggyError> {
let server_name = if shard.config.tcp.tls.enabled {
"Iggy TCP TLS"
} else {
"Iggy TCP"
};
info!("Initializing {server_name} server...");
- let addr = match shard.config.tcp.tls.enabled {
+ // TODO: Fixme -- storing addr of the server inside of the config for
integration tests...
+ let result = match shard.config.tcp.tls.enabled {
true => unimplemented!("TLS support is not implemented yet"),
false => tcp_listener::start(server_name, shard).await,
};
- info!("{server_name} server has started on: {:?}", addr);
- Ok(())
+ //info!("{server_name} server has started on: {:?}", addr);
+ result
}
diff --git a/core/server/src/tcp/tcp_socket.rs
b/core/server/src/tcp/tcp_socket.rs
index 747540fa..7ea3acf0 100644
--- a/core/server/src/tcp/tcp_socket.rs
+++ b/core/server/src/tcp/tcp_socket.rs
@@ -30,6 +30,15 @@ pub fn build(ipv6: bool, config: &TcpSocketConfig) -> Socket
{
.expect("Unable to create an ipv4 socket")
};
+ // Required by the thread-per-core model...
+ // We create bunch of sockets on different threads, that bind to exactly
the same address and port.
+ socket
+ .set_reuse_address(true)
+ .expect("Unable to set SO_REUSEADDR on socket");
+ socket
+ .set_reuse_port(true)
+ .expect("Unable to set SO_REUSEPORT on socket");
+
if config.override_defaults {
config
.recv_buffer_size
@@ -54,12 +63,6 @@ pub fn build(ipv6: bool, config: &TcpSocketConfig) -> Socket
{
socket
.set_linger(Some(config.linger.get_duration()))
.expect("Unable to set SO_LINGER on socket");
- socket
- .set_reuse_address(true)
- .expect("Unable to set SO_REUSEADDR on socket");
- socket
- .set_reuse_port(true)
- .expect("Unable to set SO_REUSEPORT on socket");
}
socket