This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc_master_merge in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 98a9b11c0417d5d17f100a000f525b862f437d48 Merge: 5ee9b0825 7ef307c47 Author: numminex <[email protected]> AuthorDate: Mon Oct 27 21:12:55 2025 +0100 Merge branch 'master' into io_uring_tpc_master_merge .github/actions/csharp-dotnet/pre-merge/action.yml | 28 +- .github/actions/go/pre-merge/action.yml | 8 + .github/config/components.yml | 10 +- .github/workflows/_build_python_wheels.yml | 26 +- .github/workflows/_test_examples.yml | 9 +- .github/workflows/publish.yml | 4 +- .gitignore | 2 +- Cargo.lock | 412 +- Cargo.toml | 45 +- DEPENDENCIES.md | 374 +- README.md | 51 +- bdd/docker-compose.yml | 2 + bdd/go/tests/tcp_test/consumers_feature_create.go | 12 +- bdd/go/tests/tcp_test/consumers_feature_delete.go | 6 +- .../tests/tcp_test/consumers_feature_get_by_id.go | 6 +- bdd/go/tests/tcp_test/consumers_feature_join.go | 7 +- bdd/go/tests/tcp_test/consumers_feature_leave.go | 7 +- bdd/go/tests/tcp_test/consumers_steps.go | 4 +- bdd/go/tests/tcp_test/messages_feature_send.go | 9 +- bdd/go/tests/tcp_test/offset_feature_delete.go | 6 +- bdd/go/tests/tcp_test/partitions_feature_create.go | 5 +- bdd/go/tests/tcp_test/partitions_feature_delete.go | 5 +- bdd/go/tests/tcp_test/stream_feature_create.go | 7 +- bdd/go/tests/tcp_test/stream_feature_delete.go | 3 +- bdd/go/tests/tcp_test/stream_feature_get_by_id.go | 6 +- bdd/go/tests/tcp_test/stream_feature_update.go | 7 +- bdd/go/tests/tcp_test/stream_steps.go | 2 +- bdd/go/tests/tcp_test/test_shared_steps.go | 16 +- bdd/go/tests/tcp_test/topic_feature_create.go | 9 +- bdd/go/tests/tcp_test/topic_feature_delete.go | 4 +- bdd/go/tests/tcp_test/topic_feature_get_by_id.go | 4 +- bdd/go/tests/tcp_test/topic_feature_update.go | 9 +- bdd/go/tests/tcp_test/topic_steps.go | 5 +- bdd/go/tests/tcp_test/users_steps.go | 3 +- bdd/python/Dockerfile | 4 +- core/ai/mcp/Cargo.toml | 7 +- core/ai/mcp/README.md | 4 +- core/ai/mcp/config.toml | 8 +- core/ai/mcp/src/api.rs | 31 +- core/ai/mcp/src/configs.rs | 165 +- core/ai/mcp/src/error.rs | 2 - core/ai/mcp/src/main.rs | 45 +- core/ai/mcp/src/service/mod.rs | 9 +- core/ai/mcp/src/service/requests.rs | 12 +- core/ai/mcp/src/stream.rs | 11 +- core/bench/dashboard/server/Cargo.toml | 4 +- core/cli/Cargo.toml | 2 +- core/common/Cargo.toml | 4 +- core/common/src/configs/mod.rs | 777 ++++ core/common/src/lib.rs | 9 +- .../configuration/tcp_config/tcp_client_config.rs | 3 +- .../tcp_config/tcp_client_config_builder.rs | 3 +- core/common/src/utils/mod.rs | 1 + core/connectors/README.md | 4 +- core/connectors/runtime/Cargo.toml | 7 +- core/connectors/runtime/README.md | 12 +- core/connectors/runtime/config.toml | 144 +- .../runtime/{config.toml => example_config.toml} | 12 +- core/connectors/runtime/src/api/auth.rs | 9 +- core/connectors/runtime/src/api/config.rs | 65 +- core/connectors/runtime/src/api/mod.rs | 37 +- core/connectors/runtime/src/configs.rs | 223 +- core/connectors/runtime/src/context.rs | 15 +- core/connectors/runtime/src/main.rs | 23 +- core/connectors/runtime/src/stream.rs | 11 +- core/connectors/sdk/Cargo.toml | 2 +- core/connectors/sdk/src/lib.rs | 3 +- core/connectors/sdk/src/sink.rs | 9 +- core/connectors/sinks/postgres_sink/src/lib.rs | 10 + core/connectors/sinks/quickwit_sink/Cargo.toml | 2 +- core/connectors/sinks/quickwit_sink/src/lib.rs | 2 +- core/connectors/sources/postgres_source/Cargo.toml | 4 + core/connectors/sources/postgres_source/README.md | 31 +- core/connectors/sources/postgres_source/src/lib.rs | 199 +- core/integration/Cargo.toml | 12 +- core/integration/src/test_connectors_runtime.rs | 240 ++ core/integration/src/test_server.rs | 9 +- core/integration/tests/config_provider/mod.rs | 14 +- core/integration/tests/connectors/mod.rs | 142 + core/integration/tests/connectors/postgres/mod.rs | 60 + .../tests/connectors/postgres/postgres.toml | 51 +- .../tests/connectors/postgres/postgres_sink.rs | 13 +- core/integration/tests/mod.rs | 1 + core/sdk/src/tcp/tcp_client.rs | 12 +- core/server/Cargo.toml | 19 +- .../create_consumer_group_handler.rs | 2 +- .../delete_consumer_group_handler.rs | 4 +- .../consumer_groups/join_consumer_group_handler.rs | 2 +- .../leave_consumer_group_handler.rs | 2 +- .../delete_consumer_offset_handler.rs | 4 +- .../store_consumer_offset_handler.rs | 4 +- .../messages/flush_unsaved_buffer_handler.rs | 4 +- .../partitions/create_partitions_handler.rs | 4 +- .../partitions/delete_partitions_handler.rs | 4 +- .../create_personal_access_token_handler.rs | 4 +- .../delete_personal_access_token_handler.rs | 4 +- .../login_with_personal_access_token_handler.rs | 2 +- .../handlers/segments/delete_segments_handler.rs | 4 +- .../handlers/streams/delete_stream_handler.rs | 4 +- .../binary/handlers/streams/get_streams_handler.rs | 2 +- .../handlers/streams/purge_stream_handler.rs | 4 +- .../handlers/streams/update_stream_handler.rs | 4 +- .../binary/handlers/topics/delete_topic_handler.rs | 4 +- .../binary/handlers/topics/purge_topic_handler.rs | 6 +- .../binary/handlers/topics/update_topic_handler.rs | 2 +- .../handlers/users/change_password_handler.rs | 4 +- .../binary/handlers/users/delete_user_handler.rs | 4 +- .../binary/handlers/users/login_user_handler.rs | 2 +- .../binary/handlers/users/logout_user_handler.rs | 2 +- .../handlers/users/update_permissions_handler.rs | 2 +- .../binary/handlers/users/update_user_handler.rs | 4 +- core/server/src/configs/config_provider.rs | 442 -- core/server/src/configs/defaults.rs | 2 + core/server/src/configs/server.rs | 101 +- core/server/src/configs/validators.rs | 46 +- core/server/src/http/consumer_groups.rs | 2 +- core/server/src/http/consumer_offsets.rs | 6 +- core/server/src/http/jwt/json_web_token.rs | 2 +- core/server/src/http/jwt/jwt_manager.rs | 12 +- core/server/src/http/jwt/middleware.rs | 12 +- core/server/src/http/jwt/storage.rs | 10 +- core/server/src/http/system.rs | 2 +- core/server/src/quic/quic_sender.rs | 10 +- core/server/src/quic/quic_server.rs | 22 +- core/server/src/server_error.rs | 32 +- core/server/src/shard/system/clients.rs | 4 +- .../src/shard/system/personal_access_tokens.rs | 14 +- core/server/src/shard/system/storage.rs | 4 +- core/server/src/shard/transmission/id.rs | 25 + core/server/src/state/file.rs | 43 +- core/server/src/state/models.rs | 44 +- core/server/src/state/system.rs | 8 +- core/server/src/streaming/partitions/segments.rs | 2 +- core/server/src/streaming/persistence/persister.rs | 16 +- .../src/streaming/segments/indexes/index_reader.rs | 2 +- .../src/streaming/segments/indexes/index_writer.rs | 10 +- .../streaming/segments/messages/messages_reader.rs | 2 +- .../streaming/segments/messages/messages_writer.rs | 4 +- core/server/src/tcp/tcp_sender.rs | 2 +- core/server/src/tcp/tcp_tls_sender.rs | 2 +- {foreign => examples}/csharp/.editorconfig | 2 +- examples/csharp/Iggy_SDK.Examples.sln | 118 + examples/csharp/Iggy_SDK.Examples.sln.DotSettings | 301 ++ examples/csharp/README.md | 80 + .../Iggy_SDK.Examples.Basic.Consumer.csproj | 24 + .../Iggy_SDK.Examples.Basic.Consumer/Program.cs | 91 + .../Iggy_SDK.Examples.Basic.Consumer/Settings.cs | 21 +- .../appsettings.json | 10 + .../Iggy_SDK.Examples.Basic.Producer.csproj | 24 + .../Iggy_SDK.Examples.Basic.Producer/Program.cs | 91 + .../Iggy_SDK.Examples.Basic.Producer/Settings.cs | 23 +- .../appsettings.json | 12 + ...ggy_SDK.Examples.GettingStarted.Consumer.csproj | 15 +- .../Utils.cs | 124 + ...ggy_SDK.Examples.GettingStarted.Producer.csproj | 15 +- .../Program.cs | 28 +- .../Utils.cs | 143 + .../src/Iggy_SDK.Examples.Shared/ExampleHelpers.cs | 60 + .../Iggy_SDK.Examples.Shared.csproj | 11 + .../src/Iggy_SDK.Examples.Shared/Messages.cs | 115 + .../Iggy_SDK.Examples.Shared/MessagesGenerator.cs | 69 + ...gy_SDK.Examples.MessageEnvelope.Consumer.csproj | 19 + .../Program.cs | 27 +- .../Utils.cs | 152 + ...gy_SDK.Examples.MessageEnvelope.Producer.csproj | 19 + .../Program.cs | 28 +- .../Utils.cs | 151 + ...ggy_SDK.Examples.MessageHeaders.Consumer.csproj | 15 +- .../Program.cs | 27 +- .../Utils.cs | 153 + ...ggy_SDK.Examples.MessageHeaders.Producer.csproj | 15 +- .../Program.cs | 28 +- .../Utils.cs | 156 + .../Iggy_SDK.Examples.NewSdk.Consumer.csproj | 22 + .../Iggy_SDK.Examples.NewSdk.Consumer/Program.cs | 60 + .../Iggy_SDK.Examples.NewSdk.Consumer/Utils.cs | 97 + .../Iggy_SDK.Examples.NewSdk.Producer.csproj | 20 + .../Iggy_SDK.Examples.NewSdk.Producer/Program.cs | 47 + .../Iggy_SDK.Examples.NewSdk.Producer/Utils.cs | 110 + examples/rust/src/shared/args.rs | 12 +- foreign/cpp/tests/e2e/server.toml | 14 - foreign/csharp/.editorconfig | 2 +- foreign/csharp/Benchmarks/Program.cs | 30 +- foreign/csharp/Benchmarks/SendMessage.cs | 10 +- foreign/csharp/DEPENDENCIES.md | 14 +- foreign/csharp/Directory.Packages.props | 12 +- .../Iggy_SDK.Tests.BDD/Context/TestContext.cs | 34 +- .../BasicMessagingOperationsSteps.cs | 52 +- .../FetchMessagesTests.cs | 127 +- .../Fixtures/FetchMessagesFixture.cs | 90 +- .../Fixtures/FlushMessageFixture.cs | 20 +- .../Fixtures/IggyServerFixture.cs | 16 +- .../Fixtures/OffsetFixtures.cs | 20 +- .../Fixtures/PollMessagesFixture.cs | 80 - .../Helpers/NameIdHelpers.cs | 14 +- .../IggyConsumerTests.cs | 838 ++++ .../IggyPublisherTests.cs | 606 +++ .../Iggy_SDK.Tests.Integration.csproj | 2 +- .../Iggy_SDK.Tests.Integration/OffsetTests.cs | 33 + .../PollMessagesTests.cs | 62 - .../SendMessagesTests.cs | 68 +- .../Iggy_SDK.Tests.Integration/StreamsTests.cs | 49 +- .../Iggy_SDK.Tests.Integration/SystemTests.cs | 10 +- .../Iggy_SDK.Tests.Integration/TopicsTests.cs | 46 +- foreign/csharp/Iggy_SDK.sln | 12 - ...amConfigurator.cs => IggyClientConfigurator.cs} | 19 +- .../Configuration/MessageStreamConfigurator.cs | 49 - .../Iggy_SDK/Configuration/TlsConfiguration.cs | 4 +- .../ConnectionStream/TcpConnectionStream.cs | 8 +- .../Iggy_SDK/Consumers/AutoCommitMode.cs} | 36 +- .../ConsumerErrorEventArgs.cs} | 28 +- foreign/csharp/Iggy_SDK/Consumers/IDeserializer.cs | 54 + .../Iggy_SDK/Consumers/IggyConsumer.Logging.cs | 107 + foreign/csharp/Iggy_SDK/Consumers/IggyConsumer.cs | 401 ++ .../Iggy_SDK/Consumers/IggyConsumerBuilder.cs | 316 ++ .../Iggy_SDK/Consumers/IggyConsumerBuilderOfT.cs | 141 + .../Iggy_SDK/Consumers/IggyConsumerConfig.cs | 152 + .../csharp/Iggy_SDK/Consumers/IggyConsumerOfT.cs | 107 + .../MessageStatus.cs} | 28 +- .../csharp/Iggy_SDK/Consumers/ReceivedMessage.cs | 63 + .../Iggy_SDK/Contracts/MessageFetchRequest.cs | 4 +- .../csharp/Iggy_SDK/Contracts/MessageResponse.cs | 6 +- .../Iggy_SDK/Contracts/MessageResponseGeneric.cs | 4 +- .../csharp/Iggy_SDK/Contracts/PolledMessages.cs | 4 +- .../csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs | 10 +- .../Iggy_SDK/Encryption/AesMessageEncryptor.cs | 128 + .../Encryption/IMessageEncryptor.cs} | 32 +- .../Exceptions/ConsumerGroupNotFoundException.cs | 18 + ...ption.cs => ConsumerNotInitializedException.cs} | 16 +- .../IggyInvalidStatusCodeException.cs} | 16 +- .../Exceptions/InvalidBaseAdressException.cs | 4 +- .../InvalidConsumerGroupNameException.cs | 14 +- ...tion.cs => PublisherNotInitializedException.cs} | 16 +- ...ressException.cs => StreamNotFoundException.cs} | 23 +- ...dressException.cs => TopicNotFoundException.cs} | 27 +- .../Iggy_SDK/Extensions/IggyClientExtenstion.cs | 44 + .../Iggy_SDK/Factory/HttpMessageStreamBuilder.cs | 78 - .../csharp/Iggy_SDK/Factory/IggyClientFactory.cs | 79 + .../Iggy_SDK/Factory/MessageStreamFactory.cs | 105 - .../Iggy_SDK/Factory/TcpMessageStreamBuilder.cs | 78 - foreign/csharp/Iggy_SDK/Identifier.cs | 22 +- foreign/csharp/Iggy_SDK/IggyClient/IIggyClient.cs | 4 +- .../csharp/Iggy_SDK/IggyClient/IIggyConsumer.cs | 30 +- .../Iggy_SDK/IggyClient/IIggyConsumerGroup.cs | 4 +- .../csharp/Iggy_SDK/IggyClient/IIggyPublisher.cs | 22 +- .../Implementations/HttpMessageStream.cs | 261 +- .../IggyClient/Implementations/TcpMessageStream.cs | 650 +-- foreign/csharp/Iggy_SDK/Iggy_SDK.csproj | 10 +- foreign/csharp/Iggy_SDK/Kinds/PollingStrategy.cs | 38 +- foreign/csharp/Iggy_SDK/Messages/Message.cs | 9 +- foreign/csharp/Iggy_SDK/Messages/MessageHeader.cs | 18 +- .../MessagesDispatcher/HttpMessageInvoker.cs | 70 - .../MessagesDispatcher/MessageSenderDispatcher.cs | 214 - .../MessagesDispatcher/TcpMessageInvoker.cs | 88 - .../BackgroundMessageProcessor.Logging.cs | 81 + .../Publishers/BackgroundMessageProcessor.cs | 262 ++ foreign/csharp/Iggy_SDK/Publishers/ISerializer.cs | 54 + .../Iggy_SDK/Publishers/IggyPublisher.Logging.cs | 139 + .../csharp/Iggy_SDK/Publishers/IggyPublisher.cs | 336 ++ .../Iggy_SDK/Publishers/IggyPublisherBuilder.cs | 425 ++ .../Iggy_SDK/Publishers/IggyPublisherBuilderOfT.cs | 141 + .../Iggy_SDK/Publishers/IggyPublisherConfig.cs | 257 ++ .../csharp/Iggy_SDK/Publishers/IggyPublisherOfT.cs | 116 + .../Publishers/MessageBatchFailedEventArgs.cs | 60 + .../Iggy_SDK/Publishers/PublisherErrorEventArgs.cs | 51 + .../Iggy_SDK/Utils/TcpMessageStreamHelpers.cs | 77 +- .../Iggy_SDK_Tests/ContractTests/TcpContract.cs | 4 +- .../Utils/Messages/MessageFactory.cs | 8 +- foreign/csharp/Iggy_Sample_Consumer/Program.cs | 227 -- .../Iggy_Sample_Producer/MessageGenerator.cs | 85 - foreign/csharp/Iggy_Sample_Producer/Program.cs | 224 - foreign/csharp/README.md | 6 +- .../binary_response_deserializer.go | 2 +- .../identifier_serializer_test.go | 7 +- foreign/go/contracts/identifier.go | 11 +- foreign/go/contracts/messages.go | 6 +- foreign/go/errors/constants.go | 65 - foreign/go/errors/errors.go | 293 +- foreign/go/errors/errors.yaml | 1111 +++++ foreign/go/errors/errors_gen.go | 3477 ++++++++++++++++ foreign/go/errors/errors_test.go | 68 +- foreign/go/errors/generator/main.go | 209 + foreign/go/go.mod | 1 + foreign/go/go.sum | 4 + foreign/go/tcp/tcp_consumer_group_managament.go | 9 +- foreign/go/tcp/tcp_core.go | 41 +- foreign/go/tcp/tcp_messaging.go | 6 +- foreign/go/tcp/tcp_stream_managament.go | 13 +- foreign/go/tcp/tcp_topic_managament.go | 23 +- foreign/go/tcp/tcp_user_managament.go | 2 +- foreign/java/BUILD_AND_TEST.md | 293 ++ foreign/java/examples/build.gradle.kts | 69 +- .../apache/iggy/async/AsyncConsumerExample.java | 159 + .../java/org/apache/iggy/async/AsyncProducer.java | 230 ++ .../iggy-connector-library/README.md | 72 + .../iggy-connector-library/build.gradle.kts | 113 + .../connector/config/IggyConnectionConfig.java | 228 ++ .../apache/iggy/connector/config/OffsetConfig.java | 153 + .../iggy/connector/error/ConnectorException.java | 94 + .../iggy/connector/partition/PartitionInfo.java | 123 + .../serialization/DeserializationSchema.java | 63 + .../connector/serialization/RecordMetadata.java | 100 + .../serialization/SerializationSchema.java | 65 + .../connector/serialization/TypeDescriptor.java | 88 + .../connector/config/IggyConnectionConfigTest.java | 221 + .../iggy/connector/config/OffsetConfigTest.java | 167 + .../connector/error/ConnectorExceptionTest.java | 168 + .../connector/partition/PartitionInfoTest.java | 157 + .../serialization/DeserializationSchemaTest.java | 171 + .../serialization/RecordMetadataTest.java | 151 + .../serialization/SerializationSchemaTest.java | 208 + .../serialization/TypeDescriptorTest.java | 178 + foreign/java/java-sdk/build.gradle.kts | 4 +- .../iggy/client/async/ConsumerGroupsClient.java | 52 + .../apache/iggy/client/async/MessagesClient.java | 127 + .../apache/iggy/client/async/StreamsClient.java | 53 + .../org/apache/iggy/client/async/TopicsClient.java | 102 + .../org/apache/iggy/client/async/UsersClient.java | 47 +- .../client/async/tcp/AsyncBytesDeserializer.java | 211 + .../client/async/tcp/AsyncBytesSerializer.java | 215 + .../iggy/client/async/tcp/AsyncIggyTcpClient.java | 349 ++ .../iggy/client/async/tcp/AsyncTcpConnection.java | 232 ++ .../client/async/tcp/ConsumerGroupsTcpClient.java | 88 + .../iggy/client/async/tcp/IggyFrameDecoder.java | 73 + .../iggy/client/async/tcp/MessagesTcpClient.java | 137 + .../iggy/client/async/tcp/StreamsTcpClient.java | 116 + .../iggy/client/async/tcp/TopicsTcpClient.java | 152 + .../iggy/client/async/tcp/UsersTcpClient.java | 86 + .../iggy/client/blocking/tcp/CommandCode.java | 2 +- .../iggy/client/blocking/tcp/IggyTcpClient.java | 226 ++ .../client/async/AsyncClientIntegrationTest.java | 410 ++ .../iggy/client/async/AsyncPollMessageTest.java | 336 ++ .../async/tcp/AsyncIggyTcpClientBuilderTest.java | 299 ++ .../blocking/tcp/IggyTcpClientBuilderTest.java | 243 ++ foreign/java/settings.gradle.kts | 7 + foreign/node/package-lock.json | 4 - foreign/node/src/examples/utils.ts | 14 - foreign/python/Cargo.toml | 11 +- foreign/python/apache_iggy.pyi | 5 + foreign/python/src/client.rs | 23 +- helm/charts/iggy/.helmignore | 23 + web/Dockerfile => helm/charts/iggy/Chart.yaml | 31 +- helm/charts/iggy/README.md | 94 + helm/charts/iggy/templates/NOTES.txt | 22 + helm/charts/iggy/templates/_helpers.tpl | 99 + helm/charts/iggy/templates/deployment.yaml | 200 + helm/charts/iggy/templates/hpa.yaml | 61 + helm/charts/iggy/templates/ingress.yaml | 143 + .../iggy/templates/root-user-credentials.yaml | 31 +- helm/charts/iggy/templates/service.yaml | 59 + .../charts/iggy/templates/serviceaccount.yaml | 30 +- .../charts/iggy/templates/servicemonitor.yaml | 52 +- helm/charts/iggy/values.yaml | 219 + scripts/run-bdd-tests.sh | 15 - scripts/run-csharp-examples-from-readme.sh | 210 + web/.eslintignore | 13 - web/.eslintrc.cjs | 51 - web/.prettierrc | 20 +- web/Dockerfile | 42 +- web/eslint.config.js | 75 + web/package-lock.json | 4280 +++++++++----------- web/package.json | 44 +- web/postcss.config.js | 8 +- web/src/app.html | 2 +- web/src/hooks.server.ts | 2 +- web/src/lib/actions/tooltip.ts | 5 +- web/src/lib/api/ApiSchema.ts | 2 +- web/src/lib/api/convertBigIntsToStrings.ts | 2 +- web/src/lib/api/handleFetchErrors.ts | 8 +- web/src/lib/components/AppToasts.svelte | 4 +- web/src/lib/components/Breadcrumbs.svelte | 13 +- web/src/lib/components/Button.svelte | 22 +- web/src/lib/components/Checkbox.svelte | 12 +- web/src/lib/components/Combobox.svelte | 36 +- .../components/DeleteButtonWithConfirmation.svelte | 2 +- .../components/DropdownMenu/DropdownMenu.svelte | 12 +- web/src/lib/components/Header.svelte | 12 +- web/src/lib/components/Icon.svelte | 2 +- web/src/lib/components/Input.svelte | 13 +- .../lib/components/Layouts/SettingsLayout.svelte | 6 +- web/src/lib/components/Listbox.svelte | 18 +- web/src/lib/components/Loader.svelte | 5 +- web/src/lib/components/LoadingIndicator.svelte | 2 +- web/src/lib/components/Logo/Logo.svelte | 4 +- web/src/lib/components/Logo/LogoMark.svelte | 243 +- .../MessageDecoder/MessageDecoder.svelte | 6 +- web/src/lib/components/ModalConfirmation.svelte | 13 +- .../components/Modals/AddPartitionsModal.svelte | 12 +- .../lib/components/Modals/AddStreamModal.svelte | 62 +- web/src/lib/components/Modals/AddTopicModal.svelte | 19 +- web/src/lib/components/Modals/AddUserModal.svelte | 13 +- web/src/lib/components/Modals/AppModals.svelte | 27 +- .../components/Modals/DeletePartitionsModal.svelte | 80 +- .../lib/components/Modals/InspectMessage.svelte | 34 +- web/src/lib/components/Modals/ModalBase.svelte | 12 +- .../components/Modals/StreamSettingsModal.svelte | 87 +- .../components/Modals/TopicSettingsModal.svelte | 22 +- web/src/lib/components/Navbar.svelte | 12 +- web/src/lib/components/Paginator.svelte | 3 +- web/src/lib/components/PasswordInput.svelte | 6 +- web/src/lib/components/PeriodicInvalidator.svelte | 18 +- web/src/lib/components/PermissionsManager.svelte | 176 +- .../RouteComponents/Settings/UsersTab.svelte | 44 +- .../Settings/UsersTabActions.svelte | 6 +- web/src/lib/components/Select.svelte | 17 +- web/src/lib/components/SlimSortableList.svelte | 24 +- web/src/lib/components/SortableList.svelte | 35 +- web/src/lib/components/StopPropagation.svelte | 3 +- web/src/lib/components/Toggler.svelte | 6 +- web/src/lib/domain/Partition.ts | 1 - web/src/lib/domain/Permissions.ts | 1 - web/src/lib/domain/Stream.ts | 2 - web/src/lib/domain/Topic.ts | 7 +- web/src/lib/types/utilTypes.ts | 8 +- web/src/lib/utils/constants/keys.ts | 110 +- web/src/lib/utils/constants/tokens.ts | 2 +- web/src/routes/api/proxy/+server.ts | 4 +- web/src/routes/auth/+layout.svelte | 2 +- web/src/routes/auth/sign-in/+page.server.ts | 10 +- web/src/routes/auth/sign-in/+page.svelte | 10 +- web/src/routes/dashboard/+layout.svelte | 2 +- web/src/routes/dashboard/overview/+page.svelte | 4 +- .../routes/dashboard/settings/server/+page.svelte | 4 +- .../routes/dashboard/settings/users/+page.svelte | 54 +- .../routes/dashboard/settings/webUI/+page.svelte | 7 +- web/src/routes/dashboard/streams/+layout.svelte | 16 +- .../dashboard/streams/[streamId=i32]/+page.svelte | 167 +- .../streams/[streamId=i32]/topics/+page.svelte | 3 +- .../topics/[topicId=i32]/+page.svelte | 176 +- .../partitions/[partitionId=i32]/+page.svelte | 3 +- .../[partitionId=i32]/messages/+page.svelte | 242 +- web/src/styles/app.css | 136 +- web/svelte.config.js | 20 +- web/tailwind.config.js | 68 - web/tsconfig.json | 2 +- web/vite.config.ts | 30 +- 436 files changed, 27298 insertions(+), 8612 deletions(-) diff --cc Cargo.lock index ca2334e0f,1179da095..ab94339e0 --- a/Cargo.lock +++ b/Cargo.lock @@@ -8,7 -8,7 +8,8 @@@ version = "0.5.2 source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f7b0a21988c1bf877cf4759ef5ddaac04c1c9fe808c9142ecb78ba97d97a28a" dependencies = [ + "bitflags 2.9.4", + "bitflags 2.10.0", "bytes", "futures-core", "futures-sink", @@@ -44,7 -44,7 +45,8 @@@ dependencies = "actix-service", "actix-utils", "actix-web", + "bitflags 2.9.4", + "bitflags 2.10.0", "bytes", "derive_more 2.0.1", "futures-core", @@@ -423,12 -417,6 +425,18 @@@ dependencies = "password-hash", ] +[[package]] +name = "arraydeque" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" - checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236" ++checksum = "3c3610892ee6e0cbce8ae2700349fcf8f98adb0dbfbee85aec3c9179d29cc072" ++dependencies = [ ++ "base64ct", ++ "blake2", ++ "cpufeatures", ++ "password-hash", ++] + [[package]] name = "arrayref" version = "0.3.9" @@@ -605,17 -575,19 +613,26 @@@ dependencies = "futures-lite", "pin-project", "thiserror 2.0.17", - "tokio", - "tokio-util", +] + +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", ] + [[package]] + name = "atoi" + version = "2.0.0" + source = "registry+https://github.com/rust-lang/crates.io-index" + checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" + dependencies = [ + "num-traits", + ] + [[package]] name = "atomic" version = "0.6.1" @@@ -956,11 -964,11 +973,11 @@@ checksum = "bef38d45163c2f1dde094a7dfd3 [[package]] name = "bitflags" -version = "2.10.0" +version = "2.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" dependencies = [ - "serde", + "serde_core", ] [[package]] @@@ -2428,19 -2257,30 +2445,39 @@@ dependencies = [[package]] name = "doc-comment" - version = "0.3.3" + version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" - checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + checksum = "780955b8b195a21ab8e4ac6b60dd1dbdcec1dc6c51c0617964b08c81785e12c9" + + [[package]] + name = "docker_credential" + version = "1.3.2" + source = "registry+https://github.com/rust-lang/crates.io-index" + checksum = "1d89dfcba45b4afad7450a99b39e751590463e45c04728cf555d36bb66940de8" + dependencies = [ + "base64 0.21.7", + "serde", + "serde_json", + ] + + [[package]] + name = "document-features" + version = "0.2.12" + source = "registry+https://github.com/rust-lang/crates.io-index" + checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61" + dependencies = [ + "litrs", + ] +[[package]] +name = "document-features" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95249b50c6c185bee49034bcb378a49dc2b5dff0be90ff6616d31d64febab05d" +dependencies = [ + "litrs", +] + [[package]] name = "dotenvy" version = "0.15.7" @@@ -2569,25 -2468,11 +2606,22 @@@ version = "1.0.2 source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "erased-serde" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "259d404d09818dec19332e31d94558aeb442fea04c817006456c24b5460bbd4b" +dependencies = [ + "serde", + "serde_core", + "typeid", +] + [[package]] name = "err_trail" - version = "0.8.5" + version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" - checksum = "cf171eed0f19d57286fabcc6634d5c85b1054921f531a44c962225bbb06190b4" - dependencies = [ - "tracing", - ] + checksum = "fd07fd8b9b9a8ed762d905184c29d418a88296c2cff36d934004dd93569ae443" [[package]] name = "errno" @@@ -3557,11 -3496,11 +3611,11 @@@ dependencies = [[package]] name = "halfbrown" - version = "0.3.0" + version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" - checksum = "aa2c385c6df70fd180bbb673d93039dbd2cd34e41d782600bdf6e1ca7bce39aa" + checksum = "0c7ed2f2edad8a14c8186b847909a41fbb9c3eafa44f88bd891114ed5019da09" dependencies = [ - "hashbrown 0.16.0", + "hashbrown 0.15.5", "serde", ] @@@ -3619,15 -3552,20 +3673,26 @@@ checksum = "9229cfe53dfd69f0609a49f6546 dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", + ] + + [[package]] + name = "hashbrown" + version = "0.16.0" + source = "registry+https://github.com/rust-lang/crates.io-index" + checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" + dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", ] +[[package]] +name = "hashbrown" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" + [[package]] name = "hashlink" version = "0.10.0" @@@ -4567,9 -4508,9 +4643,9 @@@ dependencies = [[package]] name = "is_terminal_polyfill" - version = "1.70.1" + version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" [[package]] name = "itertools" @@@ -4893,16 -4846,6 +4981,16 @@@ dependencies = "vcpkg", ] +[[package]] - name = "libyml" - version = "0.0.5" ++name = "libsqlite3-sys" ++version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" - checksum = "3302702afa434ffa30847a83305f0a69d6abd74293b6554c18ec85c7ef30c980" ++checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ - "anyhow", - "version_check", ++ "pkg-config", ++ "vcpkg", +] + [[package]] name = "libz-rs-sys" version = "0.5.2" @@@ -6823,29 -6842,17 +6915,50 @@@ dependencies = "uuid", ] +[[package]] +name = "rmcp-macros" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1827cd98dab34cade0513243c6fe0351f0f0b2c9d6825460bcf45b42804bdda0" +dependencies = [ + "darling 0.21.3", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.106", +] + +[[package]] +name = "ron" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" - checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94" ++checksum = "1fdad1258f7259fdc0f2dfc266939c82c3b5d1fd72bcde274d600cdc27e60243" +dependencies = [ + "base64 0.21.7", + "bitflags 2.9.4", + "serde", - "serde_derive", ++ "serde_json", ++ "sse-stream", ++ "thiserror 2.0.17", ++ "tokio", ++ "tokio-stream", ++ "tokio-util", ++ "tower-service", ++ "tracing", ++ "uuid", ++] ++ + [[package]] + name = "rmcp-macros" + version = "0.8.3" + source = "registry+https://github.com/rust-lang/crates.io-index" + checksum = "ede0589a208cc7ce81d1be68aa7e74b917fcd03c81528408bab0457e187dcd9b" + dependencies = [ + "darling 0.21.3", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.108", ] [[package]] @@@ -7098,23 -7140,17 +7211,43 @@@ dependencies = "serde_json", ] +[[package]] +name = "schemars_derive" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.106", +] + +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" - checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" ++checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0" ++dependencies = [ ++ "chrono", ++ "dyn-clone", ++ "ref-cast", ++ "schemars_derive", ++ "serde", ++ "serde_json", ++] ++ + [[package]] + name = "schemars_derive" + version = "1.0.4" + source = "registry+https://github.com/rust-lang/crates.io-index" + checksum = "33d020396d1d138dc19f1165df7545479dcd58d93810dc5d646a16e55abefa80" + dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.108", + ] [[package]] name = "scopeguard" @@@ -7457,12 -7462,12 +7589,12 @@@ dependencies = "figment", "flume", "futures", + "hash32 1.0.0", "human-repr", "iggy_common", - "jsonwebtoken", + "jsonwebtoken 10.1.0", "lending-iterator", "mimalloc", - "mockall", "moka", "nix", "once_cell", @@@ -7570,11 -7571,10 +7702,11 @@@ checksum = "d66dc143e6b11c1eddc06d5c423 [[package]] name = "simd-json" - version = "0.15.1" + version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" - checksum = "c962f626b54771990066e5435ec8331d1462576cd2d1e62f24076ae014f92112" + checksum = "4255126f310d2ba20048db6321c81ab376f6a6735608bf11f0785c41f01f64e3" dependencies = [ + "getrandom 0.3.4", "halfbrown", "ref-cast", "serde", @@@ -7940,16 -7940,22 +8072,27 @@@ dependencies = ] [[package]] -name = "static_assertions" -version = "1.1.0" +name = "stringprep" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] + name = "stringprep" + version = "0.1.5" + source = "registry+https://github.com/rust-lang/crates.io-index" + checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" + dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", + ] + [[package]] name = "strsim" version = "0.11.1" @@@ -9807,14 -9900,13 +10036,13 @@@ dependencies = ] [[package]] -name = "xattr" -version = "1.6.1" +name = "yaml-rust2" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +checksum = "2462ea039c445496d8793d052e13787f2b90e750b833afee748e601c17621ed9" dependencies = [ - "arraydeque", - "encoding_rs", - "hashlink", + "libc", + "rustix", ] [[package]] diff --cc Cargo.toml index 8f9337063,9214191cb..247be4a56 --- a/Cargo.toml +++ b/Cargo.toml @@@ -95,8 -95,8 +95,9 @@@ dotenvy = "0.15.7 enum_dispatch = "0.3.13" env_logger = "0.11.8" figlet-rs = "0.1.5" + figment = { version = "0.10.19", features = ["toml", "env"] } flume = "0.11.1" +async-channel = "2.3.1" futures = "0.3.31" futures-util = "0.3.31" human-repr = "1.1.0" @@@ -114,12 -114,12 +115,10 @@@ mockall = "0.13.1 nonzero_lit = "0.1.2" once_cell = "1.21.3" passterm = "=2.0.1" --postcard = { version = "1.1.3", features = ["alloc"] } --predicates = "3.1.3" --quinn = "0.11.9" --rand = "0.9.2" - regex = "1.11.2" - reqwest = { version = "0.12.23", default-features = false, features = [ -regex = "1.12.2" -reqwest = { version = "0.12.24", default-features = false, features = [ ++quinn = "0.11.8" ++postcard = { version = "1.1.1", features = ["alloc"] } ++rand = "0.9.1" ++reqwest = { version = "0.12.20", default-features = false, features = [ "json", "rustls-tls", ] } @@@ -129,46 -129,24 +128,55 @@@ rust-s3 = { version = "0.37.0", default "tokio-rustls-tls", "tags", ] } +rustls = { version = "0.23.31", features = ["ring"] } +serde = { version = "1.0.225", features = ["derive", "rc"] } +serde_json = "1.0.145" +serde_with = { version = "3.14.0", features = ["base64", "macros"] } +serde_yml = "0.0.12" ++rust-s3 = { version = "0.37.0", default-features = false, features = [ ++ "tokio-rustls-tls", ++ "tags", ++] } + rustls = { version = "0.23.34", features = ["ring"] } + serde = { version = "1.0.228", features = ["derive", "rc"] } + serde_json = "1.0.145" + serde_with = { version = "3.15.1", features = ["base64", "macros"] } + serde_yaml_ng = "0.10.0" serial_test = "3.2.0" server = { path = "core/server" } -simd-json = { version = "0.17.0", features = ["serde_impl"] } +simd-json = { version = "0.15.1", features = ["serde_impl"] } strum = { version = "0.27.2", features = ["derive"] } strum_macros = "0.27.2" -sysinfo = "0.37.2" -tempfile = "3.23.0" +sysinfo = "0.37.0" +tempfile = "3.21.0" test-case = "3.3.1" -thiserror = "2.0.17" -tokio = { version = "1.48.0", features = ["full"] } -tokio-rustls = "0.26.4" +thiserror = "2.0.16" +tokio = { version = "1.47.1", features = ["full"] } +compio = { git = "https://github.com/compio-rs/compio.git", rev = "fe4243f0b6811ebc325afd081c9b087b4d9817be", features = [ + "runtime", + "macros", + "io-uring", + "time", + "rustls", +] } +cyper = { git = "https://github.com/krishvishal/cyper.git", rev = "cd75e266df6ab0a9b9474eb7dda1735650d17db6", features = [ + "rustls", +], default-features = false } +cyper-axum = { git = "https://github.com/krishvishal/cyper", rev = "cd75e266df6ab0a9b9474eb7dda1735650d17db6" } +compio-net = { git = "https://github.com/compio-rs/compio.git", rev = "fe4243f0b6811ebc325afd081c9b087b4d9817be" } +compio-quic = { git = "https://github.com/compio-rs/compio.git", rev = "fe4243f0b6811ebc325afd081c9b087b4d9817be" } +compio-tls = { git = "https://github.com/compio-rs/compio.git", rev = "fe4243f0b6811ebc325afd081c9b087b4d9817be", features = [ + "rustls", +] } +compio-io = { git = "https://github.com/compio-rs/compio.git", rev = "fe4243f0b6811ebc325afd081c9b087b4d9817be" } +compio-ws = { git = "https://github.com/krishvishal/compio-ws", features = [ + "rustls", +] } +tungstenite = "0.27.0" +tokio-tungstenite = { version = "0.27", features = ["rustls-tls-webpki-roots"] } +tokio-rustls = "0.26.2" tokio-util = { version = "0.7.16", features = ["compat"] } -toml = "0.9.8" +toml = "0.9.5" tower-http = { version = "0.6.6", features = [ "add-extension", "cors", diff --cc DEPENDENCIES.md index 34a7bc6d2,503423dfc..983f95ab9 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@@ -258,16 -270,14 +270,14 @@@ futures-sink: 0.3.31, "Apache-2.0 OR MI futures-task: 0.3.31, "Apache-2.0 OR MIT", futures-timer: 3.0.3, "Apache-2.0 OR MIT", futures-util: 0.3.31, "Apache-2.0 OR MIT", - generator: 0.8.7, "Apache-2.0 OR MIT", - generic-array: 0.14.7, "MIT", + generic-array: 0.14.9, "MIT", getrandom: 0.2.16, "Apache-2.0 OR MIT", - getrandom: 0.3.3, "Apache-2.0 OR MIT", + getrandom: 0.3.4, "Apache-2.0 OR MIT", ghash: 0.5.1, "Apache-2.0 OR MIT", gherkin: 0.14.0, "Apache-2.0 OR MIT", - gimli: 0.31.1, "Apache-2.0 OR MIT", git2: 0.20.2, "Apache-2.0 OR MIT", --glob: 0.3.3, "Apache-2.0 OR MIT", -globset: 0.4.18, "MIT OR Unlicense", ++glob: 0.3.2, "Apache-2.0 OR MIT", +globset: 0.4.16, "MIT OR Unlicense", globwalk: 0.9.1, "MIT", gloo: 0.8.1, "Apache-2.0 OR MIT", gloo: 0.10.0, "Apache-2.0 OR MIT", diff --cc bdd/docker-compose.yml index 40fd2c7a6,ab85afff3..86bbc9fa9 --- a/bdd/docker-compose.yml +++ b/bdd/docker-compose.yml @@@ -27,22 -27,10 +27,24 @@@ services PREBUILT_IGGY_CLI: ${IGGY_CLI_PATH:-target/debug/iggy} LIBC: glibc PROFILE: debug - command: ["--fresh"] + command: ["--fresh", "--with-default-root-credentials"] + cap_add: + - SYS_NICE + security_opt: + - seccomp:unconfined + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: ["CMD", "/usr/local/bin/iggy", "ping"] + interval: 1s + timeout: 3s + retries: 30 + start_period: 2s environment: + - IGGY_ROOT_USERNAME=iggy + - IGGY_ROOT_PASSWORD=iggy - RUST_LOG=info - IGGY_SYSTEM_PATH=local_data - IGGY_TCP_ADDRESS=0.0.0.0:8090 diff --cc core/integration/src/test_server.rs index 160094d26,372a44139..c1ebd0831 --- a/core/integration/src/test_server.rs +++ b/core/integration/src/test_server.rs @@@ -356,48 -325,11 +354,51 @@@ impl TestServer sleep(Duration::from_millis(SLEEP_INTERVAL_MS)); continue; } - match file_config_provider.load_config().await { + match ServerConfig::file_config_provider(config_path.clone()) + .load_config() + .await + { Ok(config) => { + // Verify config contains fresh addresses, not stale defaults + // Default ports: TCP=8090, HTTP=3000, QUIC=8080, WebSocket=8092 + let tcp_port: u16 = config + .tcp + .address + .split(':') + .nth(1) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + let http_port: u16 = config + .http + .address + .split(':') + .nth(1) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + let quic_port: u16 = config + .quic + .address + .split(':') + .nth(1) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + let websocket_port: u16 = config + .websocket + .address + .split(':') + .nth(1) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + + if tcp_port == 8090 + || http_port == 3000 + || quic_port == 8080 + || websocket_port == 8092 + { + sleep(Duration::from_millis(SLEEP_INTERVAL_MS)); + continue; + } + loaded_config = Some(config); break; } diff --cc core/integration/tests/mod.rs index 2adf4fd0c,2d36c1167..fb2156d8e --- a/core/integration/tests/mod.rs +++ b/core/integration/tests/mod.rs @@@ -25,8 -25,10 +25,9 @@@ use std::sync::atomic::{AtomicBool, Ord use std::sync::{Arc, Once}; use std::{panic, thread}; -mod archiver; mod cli; mod config_provider; + mod connectors; mod data_integrity; mod mcp; mod sdk; diff --cc core/server/Cargo.toml index b94bd732c,e655a7db1..b1ca3d829 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@@ -26,14 -26,15 +26,18 @@@ license = "Apache-2.0 normal = ["tracing-appender"] [package.metadata.cargo-machete] -ignored = ["rust-s3"] +ignored = ["rusty-s3"] + +[[bin]] +name = "iggy-server" +path = "src/main.rs" + [[bin]] + name = "iggy-server" + path = "src/main.rs" + [features] default = ["mimalloc"] -tokio-console = ["dep:console-subscriber", "tokio/tracing"] disable-mimalloc = [] mimalloc = ["dep:mimalloc"] @@@ -65,25 -56,23 +69,26 @@@ dashmap = { workspace = true derive_more = { workspace = true } dotenvy = { workspace = true } enum_dispatch = { workspace = true } - error_set = { version = "0.8.5", features = ["tracing"] } + err_trail = { version = "0.9.0" } + error_set = { version = "0.9.0" } figlet-rs = { workspace = true } - figment = { version = "0.10.19", features = ["toml", "env"] } + figment = { workspace = true } flume = { workspace = true } +async-channel = { workspace = true } futures = { workspace = true } human-repr = { workspace = true } iggy_common = { workspace = true } -jsonwebtoken = { version = "10.1.0", features = ["rust_crypto"] } +jsonwebtoken = "9.3.1" +socket2 = "0.6.0" lending-iterator = "0.1.7" +hash32 = "1.0.0" mimalloc = { workspace = true, optional = true } - moka = { version = "0.12.10", features = ["future"] } + moka = { version = "0.12.11", features = ["future"] } nix = { version = "0.30", features = ["fs"] } once_cell = "1.21.3" - opentelemetry = { version = "0.30.0", features = ["trace", "logs"] } - opentelemetry-appender-tracing = { version = "0.30.1", features = ["log"] } - opentelemetry-otlp = { version = "0.30.0", features = [ + opentelemetry = { version = "0.31.0", features = ["trace", "logs"] } + opentelemetry-appender-tracing = { version = "0.31.1", features = ["log"] } + opentelemetry-otlp = { version = "0.31.0", features = [ "logs", "trace", "grpc-tonic", @@@ -116,13 -106,14 +121,13 @@@ strum = { workspace = true sysinfo = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true } -tokio-rustls = { workspace = true } -tokio-util = { workspace = true } +compio = { workspace = true } toml = { workspace = true } tower-http = { workspace = true } +test-case = { workspace = true } tracing = { workspace = true } tracing-appender = { workspace = true } - tracing-opentelemetry = "0.31.0" + tracing-opentelemetry = "0.32.0" tracing-subscriber = { workspace = true } twox-hash = { workspace = true } ulid = "1.2.1" diff --cc core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index 03a8a880b,5ada67ce6..c53c34f9e --- a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@@ -75,17 -80,11 +75,17 @@@ impl ServerCommandHandler for CreateCon }), ) .await - .with_error_context(|error| { + .with_error(|error| { format!( - "{COMPONENT} (error: {error}) - failed to apply create consumer group for stream_id: {stream_id}, topic_id: {topic_id}, group_id: {group_id:?}, session: {session}" + "{COMPONENT} (error: {error}) - failed to apply create consumer group for stream_id: {stream_id}, topic_id: {topic_id}, group_id: {cg_id}, session: {session}" ) })?; + let response = shard.streams.with_consumer_group_by_id( + &stream_id, + &topic_id, + &Identifier::numeric(cg_id as u32).unwrap(), + |(root, members)| mapper::map_consumer_group(root, members), + ); sender.send_ok_response(&response).await?; Ok(()) } diff --cc core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs index 024000242,5d4136a57..17d002d3d --- a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs @@@ -19,17 -19,13 +19,17 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; +use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::delete_consumer_group::DeleteConsumerGroup; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for DeleteConsumerGroup { @@@ -98,10 -68,9 +98,10 @@@ &EntryCommand::DeleteConsumerGroup(self), ) .await - .with_error_context(|error| { + .with_error(|error| { format!( - "{COMPONENT} (error: {error}) - failed to apply delete consumer group for stream_id: {stream_id}, topic_id: {topic_id}, group_id: {group_id:?}, session: {session}" + "{COMPONENT} (error: {error}) - failed to apply delete consumer group for stream_id: {}, topic_id: {}, group_id: {cg_id}, session: {session}", + stream_id, topic_id ) })?; sender.send_empty_ok_response().await?; diff --cc core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs index dc0af6031,c9544abe4..a018a9c3f --- a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs @@@ -19,14 -19,12 +19,14 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::join_consumer_group::JoinConsumerGroup; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for JoinConsumerGroup { diff --cc core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs index 3b8013137,df429404f..e482627dd --- a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs @@@ -16,18 -16,15 +16,18 @@@ * under the License. */ +use super::COMPONENT; use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; -use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind}; +use crate::binary::sender::SenderKind; + +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::leave_consumer_group::LeaveConsumerGroup; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for LeaveConsumerGroup { diff --cc core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs index c9850f921,2d0d7599c..c45a877b8 --- a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs @@@ -20,13 -20,12 +20,13 @@@ use crate::binary::command::{BinaryServ use crate::binary::handlers::consumer_offsets::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::delete_consumer_offset::DeleteConsumerOffset; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for DeleteConsumerOffset { diff --cc core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs index a53ef24ee,569e534a0..c41f3a6da --- a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs @@@ -22,10 -20,10 +22,10 @@@ use crate::binary::command::{BinaryServ use crate::binary::handlers::consumer_offsets::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::store_consumer_offset::StoreConsumerOffset; use tracing::debug; diff --cc core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs index 90c204d16,980a99f11..aa483fd82 --- a/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs +++ b/core/server/src/binary/handlers/messages/flush_unsaved_buffer_handler.rs @@@ -19,12 -19,11 +19,12 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::messages::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::{FlushUnsavedBuffer, IggyError}; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for FlushUnsavedBuffer { @@@ -47,20 -46,13 +47,20 @@@ let topic_id = self.topic_id.clone(); let partition_id = self.partition_id; let fsync = self.fsync; - system - .flush_unsaved_buffer(session, stream_id, topic_id, partition_id, fsync) + + shard + .flush_unsaved_buffer( + user_id, + self.stream_id, + self.topic_id, + partition_id as usize, + fsync, + ) .await - .with_error_context(|error| { + .with_error(|error| { format!( "{COMPONENT} (error: {error}) - failed to flush unsaved buffer for stream_id: {}, topic_id: {}, partition_id: {}, session: {}", - self.stream_id, self.topic_id, self.partition_id, session + stream_id, topic_id, partition_id, session ) })?; sender.send_empty_ok_response().await?; diff --cc core/server/src/binary/handlers/partitions/create_partitions_handler.rs index 844c42d68,713c1ea44..537a03f35 --- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs @@@ -19,18 -19,13 +19,18 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; +use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; +use crate::streaming::{streams, topics}; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::create_partitions::CreatePartitions; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for CreatePartitions { diff --cc core/server/src/binary/handlers/partitions/delete_partitions_handler.rs index d4947af72,7303a0a0b..1405cc65b --- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs @@@ -19,16 -19,13 +19,16 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::delete_partitions::DeletePartitions; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for DeletePartitions { diff --cc core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs index 533890812,9b571a9b2..35e8866f4 --- a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs @@@ -20,17 -20,15 +20,17 @@@ use crate::binary::command::{BinaryServ use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::personal_access_tokens::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::state::models::CreatePersonalAccessTokenWithHash; -use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::create_personal_access_token::CreatePersonalAccessToken; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for CreatePersonalAccessToken { diff --cc core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs index 9d88a5555,f9f7c3aab..188cea8bb --- a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs @@@ -19,14 -19,13 +19,14 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::personal_access_tokens::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::delete_personal_access_token::DeletePersonalAccessToken; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for DeletePersonalAccessToken { diff --cc core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs index 862297810,cae3f4720..8606fa99f --- a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs @@@ -23,8 -21,9 +23,8 @@@ use crate::binary::handlers::utils::rec use crate::binary::mapper; use crate::binary::{handlers::personal_access_tokens::COMPONENT, sender::SenderKind}; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::login_with_personal_access_token::LoginWithPersonalAccessToken; use tracing::{debug, instrument}; diff --cc core/server/src/binary/handlers/segments/delete_segments_handler.rs index f174dfad3,71b7d23a0..c01430a1d --- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs +++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs @@@ -19,21 -19,13 +19,21 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::namespace::IggyNamespace; +use crate::shard::transmission::frame::ShardResponse; +use crate::shard::transmission::message::{ + ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, +}; use crate::state::command::EntryCommand; +use crate::streaming; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::delete_segments::DeleteSegments; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for DeleteSegments { diff --cc core/server/src/binary/handlers/streams/delete_stream_handler.rs index 4f2130ac7,2baaa2efd..4d162bbf8 --- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs @@@ -19,17 -19,13 +19,17 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; +use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::delete_stream::DeleteStream; +use std::rc::Rc; +use tracing::info; use tracing::{debug, instrument}; impl ServerCommandHandler for DeleteStream { diff --cc core/server/src/binary/handlers/streams/get_streams_handler.rs index a331b0732,044a34546..f6b192764 --- a/core/server/src/binary/handlers/streams/get_streams_handler.rs +++ b/core/server/src/binary/handlers/streams/get_streams_handler.rs @@@ -21,14 -21,12 +21,14 @@@ use crate::binary::handlers::streams::C use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; +use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents}; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::get_streams::GetStreams; +use std::rc::Rc; use tracing::debug; impl ServerCommandHandler for GetStreams { diff --cc core/server/src/binary/handlers/streams/purge_stream_handler.rs index 40d40c085,1ebd31928..3dd8c66b3 --- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs @@@ -19,16 -19,13 +19,16 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::purge_stream::PurgeStream; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for PurgeStream { diff --cc core/server/src/binary/handlers/streams/update_stream_handler.rs index d6ec338fd,95841e990..18268a090 --- a/core/server/src/binary/handlers/streams/update_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs @@@ -19,16 -19,13 +19,16 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::update_stream::UpdateStream; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for UpdateStream { diff --cc core/server/src/binary/handlers/topics/delete_topic_handler.rs index 4cd215af6,3a0b23993..2056fc959 --- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs @@@ -19,18 -19,13 +19,18 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; +use crate::streaming::streams; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::delete_topic::DeleteTopic; +use std::rc::Rc; +use tracing::info; use tracing::{debug, instrument}; impl ServerCommandHandler for DeleteTopic { diff --cc core/server/src/binary/handlers/topics/purge_topic_handler.rs index 2d1a67ab3,895cc5c49..a77ba38a3 --- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs @@@ -19,14 -19,13 +19,14 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::purge_topic::PurgeTopic; +use std::rc::Rc; use tracing::{debug, instrument}; impl ServerCommandHandler for PurgeTopic { @@@ -40,16 -39,14 +40,16 @@@ sender: &mut SenderKind, _length: u32, session: &Session, - system: &SharedSystem, + shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - let system = system.read().await; - system + let topic_id = self.topic_id.clone(); + let stream_id = self.stream_id.clone(); + + shard .purge_topic(session, &self.stream_id, &self.topic_id) .await - .with_error_context(|error| { + .with_error(|error| { format!( "{COMPONENT} (error: {error}) - failed to purge topic with id: {}, stream_id: {}", self.topic_id, self.stream_id diff --cc core/server/src/binary/handlers/users/change_password_handler.rs index aa84405b5,f572366ef..8c02b9c5d --- a/core/server/src/binary/handlers/users/change_password_handler.rs +++ b/core/server/src/binary/handlers/users/change_password_handler.rs @@@ -19,18 -19,14 +19,18 @@@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use crate::streaming::utils::crypto; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::change_password::ChangePassword; +use std::rc::Rc; +use tracing::info; use tracing::{debug, instrument}; impl ServerCommandHandler for ChangePassword { diff --cc core/server/src/binary/handlers/users/delete_user_handler.rs index c1e5a5478,9aa095958..6188411ae --- a/core/server/src/binary/handlers/users/delete_user_handler.rs +++ b/core/server/src/binary/handlers/users/delete_user_handler.rs @@@ -21,16 -19,13 +21,16 @@@ use std::rc::Rc use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::delete_user::DeleteUser; +use tracing::info; use tracing::{debug, instrument}; impl ServerCommandHandler for DeleteUser { diff --cc core/server/src/binary/handlers/users/login_user_handler.rs index 6c5b35d5b,04424e48a..26fe3c858 --- a/core/server/src/binary/handlers/users/login_user_handler.rs +++ b/core/server/src/binary/handlers/users/login_user_handler.rs @@@ -20,14 -20,13 +20,14 @@@ use crate::binary::command::{BinaryServ use crate::binary::handlers::utils::receive_and_validate; use crate::binary::mapper; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::login_user::LoginUser; -use tracing::{debug, instrument}; +use std::rc::Rc; +use tracing::{debug, info, instrument, warn}; impl ServerCommandHandler for LoginUser { fn code(&self) -> u32 { diff --cc core/server/src/binary/handlers/users/logout_user_handler.rs index 251a66c99,f9d54db88..10e246ea1 --- a/core/server/src/binary/handlers/users/logout_user_handler.rs +++ b/core/server/src/binary/handlers/users/logout_user_handler.rs @@@ -21,14 -19,12 +21,14 @@@ use std::rc::Rc use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::logout_user::LogoutUser; +use tracing::info; use tracing::{debug, instrument}; impl ServerCommandHandler for LogoutUser { diff --cc core/server/src/binary/handlers/users/update_permissions_handler.rs index ebbc905c1,f842eca85..47d81d3ca --- a/core/server/src/binary/handlers/users/update_permissions_handler.rs +++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs @@@ -21,16 -19,13 +21,16 @@@ use std::rc::Rc use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::update_permissions::UpdatePermissions; +use tracing::info; use tracing::{debug, instrument}; impl ServerCommandHandler for UpdatePermissions { diff --cc core/server/src/binary/handlers/users/update_user_handler.rs index 628a06bd6,7a6f46e34..f5160dd41 --- a/core/server/src/binary/handlers/users/update_user_handler.rs +++ b/core/server/src/binary/handlers/users/update_user_handler.rs @@@ -21,16 -19,13 +21,16 @@@ use std::rc::Rc use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHandler}; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::users::COMPONENT, sender::SenderKind}; + +use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; -use crate::streaming::systems::system::SharedSystem; use anyhow::Result; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::update_user::UpdateUser; +use tracing::info; use tracing::{debug, instrument}; impl ServerCommandHandler for UpdateUser { diff --cc core/server/src/configs/defaults.rs index 7aed1e606,c30211f23..bef09df78 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@@ -35,9 -35,10 +35,11 @@@ use crate::configs::system:: StateConfig, StreamConfig, SystemConfig, TopicConfig, }; use crate::configs::tcp::{TcpConfig, TcpTlsConfig}; +use crate::configs::websocket::{WebSocketConfig, WebSocketTlsConfig}; use iggy_common::IggyByteSize; use iggy_common::IggyDuration; + use iggy_common::TransportProtocol; + use std::str::FromStr; use std::sync::Arc; use std::time::Duration; diff --cc core/server/src/configs/server.rs index 83608bfcc,b7dcc56aa..245cf599f --- a/core/server/src/configs/server.rs +++ b/core/server/src/configs/server.rs @@@ -23,10 -24,18 +23,19 @@@ use crate::configs::http::HttpConfig use crate::configs::quic::QuicConfig; use crate::configs::system::SystemConfig; use crate::configs::tcp::TcpConfig; +use crate::configs::websocket::WebSocketConfig; use crate::server_error::ConfigError; use derive_more::Display; - use error_set::ErrContext; + use err_trail::ErrContext; + use figment::Metadata; + use figment::Profile; + use figment::Provider; + use figment::providers::Format; + use figment::providers::Toml; + use figment::value::Dict; + use iggy_common::ConfigProvider; + use iggy_common::CustomEnvProvider; + use iggy_common::FileConfigProvider; use iggy_common::IggyDuration; use iggy_common::Validatable; use serde::{Deserialize, Serialize}; diff --cc core/server/src/configs/validators.rs index ae6f037e5,7d64ec1d3..d5d97d1aa --- a/core/server/src/configs/validators.rs +++ b/core/server/src/configs/validators.rs @@@ -39,42 -38,26 +39,32 @@@ use tracing::error impl Validatable<ConfigError> for ServerConfig { fn validate(&self) -> Result<(), ConfigError> { - self.system - .memory_pool - .validate() - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to validate memory pool config") - })?; - self.data_maintenance - .validate() - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to validate data maintenance config") - })?; - self.personal_access_token - .validate() - .with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to validate personal access token config" - ) - })?; - self.system.segment.validate().with_error_context(|error| { + self.system.memory_pool.validate().with_error(|error| { + format!("{COMPONENT} (error: {error}) - failed to validate memory pool config") + })?; + self.data_maintenance.validate().with_error(|error| { + format!("{COMPONENT} (error: {error}) - failed to validate data maintenance config") + })?; + self.personal_access_token.validate().with_error(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to validate personal access token config" + ) + })?; + self.system.segment.validate().with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to validate segment config") })?; - self.system - .compression - .validate() - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to validate compression config") - })?; - self.telemetry.validate().with_error_context(|error| { + self.system.compression.validate().with_error(|error| { + format!("{COMPONENT} (error: {error}) - failed to validate compression config") + })?; + self.telemetry.validate().with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to validate telemetry config") })?; + self.system + .sharding + .validate() + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to validate sharding config") + })?; let topic_size = match self.system.topic.max_size { MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()), @@@ -213,10 -196,13 +203,10 @@@ impl Validatable<ConfigError> for Messa impl Validatable<ConfigError> for DataMaintenanceConfig { fn validate(&self) -> Result<(), ConfigError> { - self.archiver.validate().with_error(|error| { - format!("{COMPONENT} (error: {error}) - failed to validate archiver config") - })?; - self.messages.validate().with_error(|error| { + self.messages.validate().with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to validate messages maintenance config") })?; - self.state.validate().with_error_context(|error| { + self.state.validate().with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to validate state maintenance config") })?; Ok(()) diff --cc core/server/src/http/jwt/storage.rs index 3fa46367f,e7e73fa0e..f4923a930 --- a/core/server/src/http/jwt/storage.rs +++ b/core/server/src/http/jwt/storage.rs @@@ -22,9 -24,10 +22,9 @@@ use crate:: }; use ahash::AHashMap; use anyhow::Context; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use std::sync::Arc; -use tokio::io::AsyncReadExt; use tracing::{error, info}; #[derive(Debug)] @@@ -44,20 -47,33 +44,20 @@@ impl TokenStorage pub async fn load_all_revoked_access_tokens( &self, ) -> Result<Vec<RevokedAccessToken>, IggyError> { - let file = file::open(&self.path).await; - if file.is_err() { - info!("No revoked access tokens found to load."); - return Ok(vec![]); - } + // Check if file exists by trying to get metadata (equivalent to original file open check) + let file_size = match compio::fs::metadata(&self.path).await { + Err(_) => { + info!("No revoked access tokens found to load."); + return Ok(vec![]); + } + Ok(metadata) => metadata.len() as usize, + }; info!("Loading revoked access tokens from: {}", self.path); - let mut file = file.map_err(|error| { - error!("Cannot open revoked access tokens file: {error}"); - IggyError::CannotReadFile - })?; - let file_size = file - .metadata() - .await - .with_error(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to read file metadata, path: {}", - self.path - ) - }) - .map_err(|_| IggyError::CannotReadFileMetadata)? - .len() as usize; - let mut buffer = PooledBuffer::with_capacity(file_size); - buffer.put_bytes(0, file_size); - file.read_exact(&mut buffer) + + let buffer = compio::fs::read(&self.path) .await - .with_error_context(|error| { + .with_error(|error| { format!( "{COMPONENT} (error: {error}) - failed to read file into buffer, path: {}", self.path @@@ -105,9 -110,9 +105,9 @@@ .with_context(|| "Failed to serialize revoked access tokens") .map_err(|_| IggyError::CannotSerializeResource)?; self.persister - .overwrite(&self.path, &bytes) + .overwrite(&self.path, bytes) .await - .with_error_context(|error| { + .with_error(|error| { format!( "{COMPONENT} (error: {error}) - failed to overwrite file, path: {}", self.path @@@ -139,9 -144,9 +139,9 @@@ .with_context(|| "Failed to serialize revoked access tokens") .map_err(|_| IggyError::CannotSerializeResource)?; self.persister - .overwrite(&self.path, &bytes) + .overwrite(&self.path, bytes) .await - .with_error_context(|error| { + .with_error(|error| { format!( "{COMPONENT} (error: {error}) - failed to overwrite file, path: {}", self.path diff --cc core/server/src/http/system.rs index 3741ec6af,be678ace3..da4990374 --- a/core/server/src/http/system.rs +++ b/core/server/src/http/system.rs @@@ -28,10 -28,10 +28,10 @@@ use axum::extract::{Path, State} use axum::http::{HeaderMap, header}; use axum::response::IntoResponse; use axum::routing::{get, post}; -use axum::{Extension, Json, Router}; +use axum::{Extension, Json, Router, debug_handler}; use bytes::Bytes; use chrono::Local; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::Stats; use iggy_common::Validatable; use iggy_common::get_snapshot::GetSnapshot; diff --cc core/server/src/quic/quic_server.rs index e13cfcd41,1cd47a4f0..449e5c35f --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@@ -162,12 -72,9 +160,10 @@@ fn configure_quic(config: &QuicConfig) transport.datagram_send_buffer_size(config.datagram_send_buffer_size.as_bytes_u64() as usize); transport.max_concurrent_bidi_streams( VarInt::try_from(config.max_concurrent_bidi_streams) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - invalid bidi stream limit") - }) + .with_error(|error| format!("{COMPONENT} (error: {error}) - invalid bidi stream limit")) .map_err(|_| QuicError::TransportConfigError)?, ); + if !config.keep_alive_interval.is_zero() { transport.keep_alive_interval(Some(config.keep_alive_interval.get_duration())); } diff --cc core/server/src/server_error.rs index 14cfbccec,d60604450..70ce206b2 --- a/core/server/src/server_error.rs +++ b/core/server/src/server_error.rs @@@ -16,27 -16,26 +16,27 @@@ * under the License. */ +use compio_quic::{ConnectionError as QuicConnectionError, ReadError, WriteError}; use error_set::error_set; -use quinn::{ConnectionError as QuicConnectionError, ReadToEndError, WriteError}; +use rusty_s3::BucketError; use std::array::TryFromSliceError; -use tokio::io; +use std::io; error_set!( - ServerError = ConfigError || ArchiverError || ConnectionError || LogError || CompatError || QuicError; + ServerError := ConfigError || ArchiverError || ConnectionError || LogError || CompatError || QuicError - IoError = { + IoError := { #[display("IO error")] IoError(io::Error), #[display("Write error")] WriteError(WriteError), - #[display("Read to end error")] - ReadToEndError(ReadToEndError) - } + #[display("Read error")] + ReadToEndError(ReadError) + }; - ConfigError = { + ConfigError := { #[display("Invalid configuration provider: {}", provider_type)] InvalidConfigurationProvider { provider_type: String }, @@@ -60,22 -59,16 +60,22 @@@ #[display("Invalid S3 credentials")] InvalidS3Credentials, + #[display("HTTP request error: {0}")] + CyperError(cyper::Error), + + #[display("Invalid S3 Bucket configuration")] + BucketError(BucketError), + #[display("Cannot archive file: {}", file_path)] CannotArchiveFile { file_path: String }, - } || IoError; + } || IoError - ConnectionError = { + ConnectionError := { #[display("Connection error")] QuicConnectionError(QuicConnectionError), - } || IoError || CommonError; + } || IoError || CommonError - LogError = { + LogError := { #[display("Logging filter reload failure")] FilterReloadFailure, diff --cc core/server/src/shard/system/clients.rs index fe7831ec8,58bb56867..cce06384a --- a/core/server/src/shard/system/clients.rs +++ b/core/server/src/shard/system/clients.rs @@@ -71,27 -85,29 +71,27 @@@ impl IggyShard &self, session: &Session, client_id: u32, - ) -> Result<Option<IggySharedMut<Client>>, IggyError> { + ) -> Result<Option<Client>, IggyError> { self.ensure_authenticated(session)?; self.permissioner + .borrow() .get_client(session.get_user_id()) - .with_error_context(|error| { + .with_error(|error| { format!( "{COMPONENT} (error: {error}) - permission denied to get client with ID: {client_id} by user ID: {}", session.get_user_id() ) })?; - let client_manager = self.client_manager.read().await; - Ok(client_manager.try_get_client(client_id)) + Ok(self.client_manager.try_get_client(client_id)) } - pub async fn get_clients( - &self, - session: &Session, - ) -> Result<Vec<IggySharedMut<Client>>, IggyError> { + pub fn get_clients(&self, session: &Session) -> Result<Vec<Client>, IggyError> { self.ensure_authenticated(session)?; self.permissioner + .borrow() .get_clients(session.get_user_id()) - .with_error_context(|error| { + .with_error(|error| { format!( "{COMPONENT} (error: {error}) - failed to get clients by user ID {}", session.get_user_id() diff --cc core/server/src/shard/system/personal_access_tokens.rs index c75acd51d,72d589954..b053cdb99 --- a/core/server/src/shard/system/personal_access_tokens.rs +++ b/core/server/src/shard/system/personal_access_tokens.rs @@@ -16,12 -16,12 +16,12 @@@ * under the License. */ +use super::COMPONENT; +use crate::shard::IggyShard; use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::session::Session; -use crate::streaming::systems::COMPONENT; -use crate::streaming::systems::system::System; use crate::streaming::users::user::User; - use error_set::ErrContext; + use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::IggyExpiry; use iggy_common::IggyTimestamp; @@@ -63,10 -61,10 +61,10 @@@ impl IggyShard let user_id = session.get_user_id(); let identifier = user_id.try_into()?; { - let user = self.get_user(&identifier).with_error_context(|error| { + let user = self.get_user(&identifier).with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to get user with id: {user_id}") })?; - let max_token_per_user = self.personal_access_token.max_tokens_per_user; + let max_token_per_user = self.config.personal_access_token.max_tokens_per_user; if user.personal_access_tokens.len() as u32 >= max_token_per_user { error!( "User with ID: {user_id} has reached the maximum number of personal access tokens: {max_token_per_user}.", diff --cc core/server/src/shard/system/storage.rs index 3e9fa1829,d8a724a79..a098e4927 --- a/core/server/src/shard/system/storage.rs +++ b/core/server/src/shard/system/storage.rs @@@ -87,9 -83,9 +87,9 @@@ impl FileSystemInfoStorage .with_context(|| "Failed to serialize system info") .map_err(|_| IggyError::CannotSerializeResource)?; self.persister - .overwrite(&self.path, &data) + .overwrite(&self.path, data) .await - .with_error_context(|error| { + .with_error(|error| { format!( "{COMPONENT} (error: {error}) - failed to overwrite file at path: {}", self.path diff --cc core/server/src/shard/transmission/id.rs index 8921dca02,000000000..0cbb6b96b mode 100644,000000..100644 --- a/core/server/src/shard/transmission/id.rs +++ b/core/server/src/shard/transmission/id.rs @@@ -1,43 -1,0 +1,68 @@@ ++<<<<<<<< HEAD:core/server/src/shard/transmission/id.rs +/* Licensed to the Apache Software Foundation (ASF) under one ++======== ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++>>>>>>>> master:foreign/node/src/examples/utils.ts + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + ++<<<<<<<< HEAD:core/server/src/shard/transmission/id.rs +use std::ops::Deref; + +// TODO: Maybe pad to cache line size? +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct ShardId { + id: u16, +} + +impl ShardId { + pub fn new(id: u16) -> Self { + Self { id } + } + + pub fn id(&self) -> u16 { + self.id + } +} + +impl Deref for ShardId { + type Target = u16; + + fn deref(&self) -> &Self::Target { + &self.id + } +} ++======== ++ ++import { Client } from '../index.js'; ++import { getIggyAddress } from '../tcp.sm.utils.js'; ++ ++ ++export const getClient = () => { ++ const [host, port] = getIggyAddress(); ++ const credentials = { username: 'iggy', password: 'iggy' }; ++ ++ const opt = { ++ transport: 'TCP' as const, ++ options: { host, port }, ++ credentials ++ }; ++ ++ return new Client(opt); ++}; ++>>>>>>>> master:foreign/node/src/examples/utils.ts diff --cc core/server/src/state/file.rs index cc6de3be1,d2859e5f3..46f930765 --- a/core/server/src/state/file.rs +++ b/core/server/src/state/file.rs @@@ -80,13 -80,27 +80,14 @@@ impl FileState pub fn term(&self) -> u64 { self.term.load(Ordering::SeqCst) } -} -impl State for FileState { - async fn init(&self) -> Result<Vec<StateEntry>, IggyError> { - if !Path::new(&self.path).exists() { - info!("State file does not exist, creating a new one"); - self.persister - .overwrite(&self.path, &[]) - .await - .with_error(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to overwrite state file, path: {}", - self.path - ) - })?; - } + pub async fn init(&self) -> Result<Vec<StateEntry>, IggyError> { + assert!(Path::new(&self.path).exists()); - let entries = self.load_entries().await.with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to load entries") - })?; + let entries = self + .load_entries() + .await + .with_error(|error| format!("{COMPONENT} (error: {error}) - failed to load entries"))?; let entries_count = entries.len() as u64; self.entries_count.store(entries_count, Ordering::SeqCst); if entries_count == 0 { @@@ -140,14 -153,13 +141,14 @@@ let mut current_index = 0; let mut entries_count = 0; loop { - let index = reader + let index = cursor .read_u64_le() .await - .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} index. {error}")) + .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} index. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?; total_size += 8; - if entries_count > 0 && index != current_index + 1 { + // Greater than one, because one of the entries after a fresh reboot is the default root user. + if entries_count > 1 && index != current_index + 1 { error!( "State file is corrupted, expected index: {}, got: {}", current_index + 1, @@@ -158,56 -170,54 +159,54 @@@ current_index = index; entries_count += 1; - let term = reader + let term = cursor .read_u64_le() .await - .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} term. {error}")) + .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} term. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?; total_size += 8; - let leader_id = reader + let leader_id = cursor .read_u32_le() .await - .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} leader_id. {error}")) + .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} leader_id. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?; total_size += 4; - let version = reader + let version = cursor .read_u32_le() .await - .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} version. {error}")) + .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} version. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?; total_size += 4; - let flags = reader + let flags = cursor .read_u64_le() .await - .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} flags. {error}")) + .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} flags. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?; total_size += 8; let timestamp = IggyTimestamp::from( - reader + cursor .read_u64_le() .await - .with_error_context(|error| { - format!("{FILE_STATE_PARSE_ERROR} timestamp. {error}") - }) + .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} timestamp. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?, ); total_size += 8; - let user_id = reader + let user_id = cursor .read_u32_le() .await - .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} user_id. {error}")) + .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} user_id. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?; total_size += 4; - let checksum = reader + let checksum = cursor .read_u32_le() .await - .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} checksum. {error}")) + .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} checksum. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?; total_size += 4; - let context_length = reader + let context_length = cursor .read_u32_le() .await - .with_error_context(|error| { + .with_error(|error| { format!("{FILE_STATE_PARSE_ERROR} context context_length. {error}") }) .map_err(|_| IggyError::InvalidNumberEncoding)? @@@ -222,18 -231,16 +221,16 @@@ .map_err(|_| IggyError::CannotReadFile)?; let context = context.freeze(); total_size += context_length as u64; - let code = reader + let code = cursor .read_u32_le() .await - .with_error_context(|error| format!("{FILE_STATE_PARSE_ERROR} code. {error}")) + .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} code. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)?; total_size += 4; - let mut command_length = reader + let mut command_length = cursor .read_u32_le() .await - .with_error_context(|error| { - format!("{FILE_STATE_PARSE_ERROR} command_length. {error}") - }) + .with_error(|error| format!("{FILE_STATE_PARSE_ERROR} command_length. {error}")) .map_err(|_| IggyError::InvalidNumberEncoding)? as usize; total_size += 4; @@@ -354,12 -361,11 +351,12 @@@ command, ); let bytes = entry.to_bytes(); + let len = bytes.len(); self.entries_count.fetch_add(1, Ordering::SeqCst); self.persister - .append(&self.path, &bytes) + .append(&self.path, bytes) .await - .with_error_context(|error| { + .with_error(|error| { format!( "{COMPONENT} (error: {error}) - failed to append state entry data to file, path: {}, data size: {}", self.path, diff --cc core/server/src/streaming/persistence/persister.rs index 8f5be92b0,4b917e715..8a960275f --- a/core/server/src/streaming/persistence/persister.rs +++ b/core/server/src/streaming/persistence/persister.rs @@@ -57,11 -68,29 +57,11 @@@ impl PersisterKind #[derive(Debug)] pub struct FilePersister; -#[derive(Debug)] -pub struct FileWithSyncPersister; - -impl Persister for FilePersister { - async fn append(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError> { - let mut file = file::append(path) +impl FilePersister { + pub async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), IggyError> { + let (mut file, position) = file::append(path) .await - .with_error_context(|error| { + .with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to append to file: {path}") }) .map_err(|_| IggyError::CannotAppendToFile)?; @@@ -75,10 -103,10 +75,10 @@@ Ok(()) } - async fn overwrite(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError> { + pub async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), IggyError> { let mut file = file::overwrite(path) .await - .with_error_context(|error| { + .with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to overwrite file: {path}") }) .map_err(|_| IggyError::CannotOverwriteFile)?; @@@ -93,10 -119,10 +93,10 @@@ Ok(()) } - async fn delete(&self, path: &str) -> Result<(), IggyError> { - fs::remove_file(path) + pub async fn delete(&self, path: &str) -> Result<(), IggyError> { + remove_file(path) .await - .with_error_context(|error| { + .with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to delete file: {path}") }) .map_err(|_| IggyError::CannotDeleteFile)?; @@@ -104,14 -130,11 +104,14 @@@ } } -impl Persister for FileWithSyncPersister { - async fn append(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError> { - let mut file = file::append(path) +#[derive(Debug)] +pub struct FileWithSyncPersister; + +impl FileWithSyncPersister { + pub async fn append<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), IggyError> { + let (mut file, position) = file::append(path) .await - .with_error_context(|error| { + .with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to append to file: {path}") }) .map_err(|_| IggyError::CannotAppendToFile)?; @@@ -133,10 -155,10 +133,10 @@@ Ok(()) } - async fn overwrite(&self, path: &str, bytes: &[u8]) -> Result<(), IggyError> { + pub async fn overwrite<B: IoBuf>(&self, path: &str, bytes: B) -> Result<(), IggyError> { let mut file = file::overwrite(path) .await - .with_error_context(|error| { + .with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to overwrite file: {path}") }) .map_err(|_| IggyError::CannotOverwriteFile)?; @@@ -159,10 -179,10 +159,10 @@@ Ok(()) } - async fn delete(&self, path: &str) -> Result<(), IggyError> { - fs::remove_file(path) + pub async fn delete(&self, path: &str) -> Result<(), IggyError> { + remove_file(path) .await - .with_error_context(|error| { + .with_error(|error| { format!("{COMPONENT} (error: {error}) - failed to delete file: {path}") }) .map_err(|_| IggyError::CannotDeleteFile)?; diff --cc core/server/src/streaming/segments/indexes/index_writer.rs index 4e77766c6,ca4c982ee..94e7a99aa --- a/core/server/src/streaming/segments/indexes/index_writer.rs +++ b/core/server/src/streaming/segments/indexes/index_writer.rs @@@ -49,11 -47,12 +49,11 @@@ impl IndexWriter file_exists: bool, ) -> Result<Self, IggyError> { let file = OpenOptions::new() - .write(true) - .append(true) .create(true) + .write(true) .open(file_path) .await - .with_error_context(|error| format!("Failed to open index file: {file_path}. {error}")) + .with_error(|error| format!("Failed to open index file: {file_path}. {error}")) .map_err(|_| IggyError::CannotReadFile)?; if file_exists {
