This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new a5124c84 feat(csharp): add examples for C# SDK (#2178)
a5124c84 is described below
commit a5124c840209f0bfa8a8c78cc6ed52ee0c19e37e
Author: Łukasz Sobczak <[email protected]>
AuthorDate: Sat Sep 20 22:45:50 2025 +0200
feat(csharp): add examples for C# SDK (#2178)
---
.github/actions/csharp-dotnet/pre-merge/action.yml | 28 +--
.github/config/components.yml | 5 +-
.github/workflows/_test_examples.yml | 9 +-
examples/csharp/Iggy_SDK.Examples.sln | 101 +++++++++
examples/csharp/README.md | 80 ++++++++
.../Iggy_SDK.Examples.Basic.Consumer.csproj | 24 +++
.../Iggy_SDK.Examples.Basic.Consumer/Program.cs | 92 +++++++++
.../Iggy_SDK.Examples.Basic.Consumer/Settings.cs | 32 +++
.../appsettings.json | 10 +
.../Iggy_SDK.Examples.Basic.Producer.csproj | 24 +++
.../Iggy_SDK.Examples.Basic.Producer/Program.cs | 100 +++++++++
.../Iggy_SDK.Examples.Basic.Producer/Settings.cs | 34 +++
.../appsettings.json | 12 ++
...ggy_SDK.Examples.GettingStarted.Consumer.csproj | 15 +-
.../Program.cs | 37 ++++
.../Utils.cs | 118 +++++++++++
...ggy_SDK.Examples.GettingStarted.Producer.csproj | 15 +-
.../Program.cs | 38 ++++
.../Utils.cs | 145 +++++++++++++
.../src/Iggy_SDK.Examples.Shared/ExampleHelpers.cs | 66 ++++++
.../Iggy_SDK.Examples.Shared.csproj | 11 +
.../src/Iggy_SDK.Examples.Shared/Messages.cs | 115 +++++++++++
.../Iggy_SDK.Examples.Shared/MessagesGenerator.cs | 69 +++++++
...gy_SDK.Examples.MessageEnvelope.Consumer.csproj | 19 ++
.../Program.cs | 37 ++++
.../Utils.cs | 146 +++++++++++++
...gy_SDK.Examples.MessageEnvelope.Producer.csproj | 19 ++
.../Program.cs | 38 ++++
.../Utils.cs | 153 ++++++++++++++
...ggy_SDK.Examples.MessageHeaders.Consumer.csproj | 15 +-
.../Program.cs | 37 ++++
.../Utils.cs | 147 +++++++++++++
...ggy_SDK.Examples.MessageHeaders.Producer.csproj | 15 +-
.../Program.cs | 38 ++++
.../Utils.cs | 158 ++++++++++++++
foreign/csharp/Iggy_SDK.sln | 12 --
foreign/csharp/Iggy_Sample_Consumer/Program.cs | 227 ---------------------
.../Iggy_Sample_Producer/MessageGenerator.cs | 85 --------
foreign/csharp/Iggy_Sample_Producer/Program.cs | 224 --------------------
scripts/run-csharp-examples-from-readme.sh | 210 +++++++++++++++++++
40 files changed, 2153 insertions(+), 607 deletions(-)
diff --git a/.github/actions/csharp-dotnet/pre-merge/action.yml
b/.github/actions/csharp-dotnet/pre-merge/action.yml
index 9d50f33e..4c08ea34 100644
--- a/.github/actions/csharp-dotnet/pre-merge/action.yml
+++ b/.github/actions/csharp-dotnet/pre-merge/action.yml
@@ -47,36 +47,34 @@ runs:
- name: Restore dependencies
run: |
- cd foreign/csharp
- dotnet restore
+ dotnet restore foreign/csharp/Iggy_SDK.sln
+ dotnet restore examples/csharp/Iggy_SDK.Examples.sln
shell: bash
- name: Build
run: |
- cd foreign/csharp
- dotnet build --no-restore
+ dotnet build foreign/csharp/Iggy_SDK.sln --no-restore
+ dotnet build examples/csharp/Iggy_SDK.Examples.sln --no-restore
shell: bash
- name: Lint (Code Analysis)
if: inputs.task == 'lint'
run: |
- cd foreign/csharp
-
# Run code analysis
- dotnet build --no-restore /p:EnforceCodeStyleInBuild=true
/p:TreatWarningsAsErrors=false
+ dotnet build foreign/csharp/Iggy_SDK.sln --no-restore
/p:EnforceCodeStyleInBuild=true /p:TreatWarningsAsErrors=false
+ dotnet build examples/csharp/Iggy_SDK.Examples.sln --no-restore
/p:EnforceCodeStyleInBuild=true /p:TreatWarningsAsErrors=false
# TODO: make format check blocking (requires dotnet-format tool)
- dotnet format --verify-no-changes --verbosity diagnostic || true
+ dotnet format examples/csharp/Iggy_SDK.Examples.sln
--verify-no-changes --verbosity diagnostic || true
+ dotnet format foreign/csharp/Iggy_SDK.sln --verify-no-changes
--verbosity diagnostic || true
shell: bash
- name: Test
if: inputs.task == 'test'
run: |
- cd foreign/csharp
-
# Run unit tests
- dotnet test Iggy_SDK_Tests --no-build --verbosity normal
+ dotnet test foreign/csharp/Iggy_SDK_Tests --no-build --verbosity normal
shell: bash
@@ -91,9 +89,7 @@ runs:
env:
IGGY_SERVER_HOST: 127.0.0.1
run: |
- cd foreign/csharp
-
- dotnet test Iggy_SDK.Tests.Integration --no-build --verbosity normal
+ dotnet test foreign/csharp/Iggy_SDK.Tests.Integration --no-build
--verbosity normal
shell: bash
- name: Stop Iggy server
@@ -106,10 +102,8 @@ runs:
- name: Build Release
if: inputs.task == 'build'
run: |
- cd foreign/csharp
-
# Build in Release mode
- dotnet build -c Release --no-restore
+ dotnet build foreign/csharp/Iggy_SDK.sln -c Release --no-restore
# List build output
echo "Build output:"
diff --git a/.github/config/components.yml b/.github/config/components.yml
index f07d0506..b11c9909 100644
--- a/.github/config/components.yml
+++ b/.github/config/components.yml
@@ -168,6 +168,7 @@ components:
- "ci-infrastructure" # CI changes trigger full regression
paths:
- "foreign/csharp/**"
+ - "examples/csharp/**"
tasks: ["lint", "test", "build", "e2e"]
# Individual BDD tests per SDK - only run when specific SDK changes
@@ -230,12 +231,14 @@ components:
- "rust-sdk"
- "rust-server"
- "sdk-go"
+ - "sdk-csharp"
- "ci-infrastructure" # CI changes trigger full regression
paths:
- "examples/**"
- "scripts/run-rust-examples-from-readme.sh"
- "scripts/run-go-examples-from-readme.sh"
- tasks: ["examples-rust", "examples-go"]
+ - "scripts/run-csharp-examples-from-readme.sh"
+ tasks: ["examples-rust", "examples-go", "examples-csharp"]
web-ui:
paths:
diff --git a/.github/workflows/_test_examples.yml
b/.github/workflows/_test_examples.yml
index 5f23aa99..89e84714 100644
--- a/.github/workflows/_test_examples.yml
+++ b/.github/workflows/_test_examples.yml
@@ -50,7 +50,7 @@ jobs:
if: inputs.component == 'examples-suite'
uses: ./.github/actions/utils/setup-rust-with-cache
with:
- cache-targets: false # Only cache registry and git deps, not target
dir (sccache handles that)
+ cache-targets: false # Only cache registry and git deps, not target
dir (sccache handles that)
- name: Build common binaries for all examples
if: inputs.component == 'examples-suite'
@@ -105,6 +105,13 @@ jobs:
# Run the examples script which will use the prebuilt server binary
./scripts/run-go-examples-from-readme.sh
+ - name: Run Csharp examples
+ if: inputs.component == 'examples-suite' && inputs.task ==
'examples-csharp'
+ run: |
+ echo "Running Csharp examples tests..."
+ # Run the examples script which will use the prebuilt server binary
+ ./scripts/run-csharp-examples-from-readme.sh
+
- name: Upload reports
if: always()
uses: actions/upload-artifact@v4
diff --git a/examples/csharp/Iggy_SDK.Examples.sln
b/examples/csharp/Iggy_SDK.Examples.sln
new file mode 100644
index 00000000..fcf6d556
--- /dev/null
+++ b/examples/csharp/Iggy_SDK.Examples.sln
@@ -0,0 +1,101 @@
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 17
+VisualStudioVersion = 17.0.31903.59
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src",
"{3576C181-8BD5-4845-9CDA-835B33A29A29}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") =
"Iggy_SDK.Examples.Basic.Consumer",
"src\Basic\Iggy_SDK.Examples.Basic.Consumer\Iggy_SDK.Examples.Basic.Consumer.csproj",
"{D25FEFAD-7A4A-4617-91E3-1B0AB9737A7C}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Iggy_SDK",
"..\..\foreign\csharp\Iggy_SDK\Iggy_SDK.csproj",
"{3E6DEB6E-88D8-46B5-BD0A-D35B3E2A35A6}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") =
"Iggy_SDK.Examples.Shared",
"src\Iggy_SDK.Examples.Shared\Iggy_SDK.Examples.Shared.csproj",
"{6CA0C169-109F-4A43-A21F-0792C1F865C8}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") =
"Iggy_SDK.Examples.Basic.Producer",
"src\Basic\Iggy_SDK.Examples.Basic.Producer\Iggy_SDK.Examples.Basic.Producer.csproj",
"{5DBD2AE2-B68A-4072-8740-839EA19E400A}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") =
"Iggy_SDK.Examples.GettingStarted.Consumer",
"src\GettingStarted\Iggy_SDK.Examples.GettingStarted.Consumer\Iggy_SDK.Examples.GettingStarted.Consumer.csproj",
"{2E3377E5-7193-4D61-9E24-3B43E73F930D}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") =
"Iggy_SDK.Examples.GettingStarted.Producer",
"src\GettingStarted\Iggy_SDK.Examples.GettingStarted.Producer\Iggy_SDK.Examples.GettingStarted.Producer.csproj",
"{F2B5BC7A-B273-4167-B794-13BE49C991D5}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Basic", "Basic",
"{45A127B4-66FC-4EF2-9282-A0129A4CA83F}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "GettingStarted",
"GettingStarted", "{3C1E95B5-59C2-43FC-9221-03489EE11B99}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "MessageEnvelope",
"MessageEnvelope", "{BFFEC58A-0DA4-4360-8830-EE2A54027435}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") =
"Iggy_SDK.Examples.MessageEnvelope.Producer",
"src\MessageEnvelope\Iggy_SDK.Examples.MessageEnvelope.Producer\Iggy_SDK.Examples.MessageEnvelope.Producer.csproj",
"{CD8B7C6D-E1AA-4A1A-9566-6602FF2C14AC}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") =
"Iggy_SDK.Examples.MessageEnvelope.Consumer",
"src\MessageEnvelope\Iggy_SDK.Examples.MessageEnvelope.Consumer\Iggy_SDK.Examples.MessageEnvelope.Consumer.csproj",
"{07EA83ED-2FB0-4E08-A9EF-8A740DDB358A}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "MessageHeaders",
"MessageHeaders", "{578335B4-7B95-41D5-BB73-E965CAB2DBA4}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") =
"Iggy_SDK.Examples.MessageHeaders.Consumer",
"src\MessageHeaders\Iggy_SDK.Examples.MessageHeaders.Consumer\Iggy_SDK.Examples.MessageHeaders.Consumer.csproj",
"{0A09EEAA-DDAE-44D3-926C-88910D661901}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") =
"Iggy_SDK.Examples.MessageHeaders.Producer",
"src\MessageHeaders\Iggy_SDK.Examples.MessageHeaders.Producer\Iggy_SDK.Examples.MessageHeaders.Producer.csproj",
"{FBA6F7E6-D92E-4E8F-81F0-625AE60F7E45}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {D25FEFAD-7A4A-4617-91E3-1B0AB9737A7C}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {D25FEFAD-7A4A-4617-91E3-1B0AB9737A7C}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {D25FEFAD-7A4A-4617-91E3-1B0AB9737A7C}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {D25FEFAD-7A4A-4617-91E3-1B0AB9737A7C}.Release|Any CPU.Build.0
= Release|Any CPU
+ {3E6DEB6E-88D8-46B5-BD0A-D35B3E2A35A6}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {3E6DEB6E-88D8-46B5-BD0A-D35B3E2A35A6}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {3E6DEB6E-88D8-46B5-BD0A-D35B3E2A35A6}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {3E6DEB6E-88D8-46B5-BD0A-D35B3E2A35A6}.Release|Any CPU.Build.0
= Release|Any CPU
+ {6CA0C169-109F-4A43-A21F-0792C1F865C8}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {6CA0C169-109F-4A43-A21F-0792C1F865C8}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {6CA0C169-109F-4A43-A21F-0792C1F865C8}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {6CA0C169-109F-4A43-A21F-0792C1F865C8}.Release|Any CPU.Build.0
= Release|Any CPU
+ {5DBD2AE2-B68A-4072-8740-839EA19E400A}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {5DBD2AE2-B68A-4072-8740-839EA19E400A}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {5DBD2AE2-B68A-4072-8740-839EA19E400A}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {5DBD2AE2-B68A-4072-8740-839EA19E400A}.Release|Any CPU.Build.0
= Release|Any CPU
+ {2E3377E5-7193-4D61-9E24-3B43E73F930D}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {2E3377E5-7193-4D61-9E24-3B43E73F930D}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {2E3377E5-7193-4D61-9E24-3B43E73F930D}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {2E3377E5-7193-4D61-9E24-3B43E73F930D}.Release|Any CPU.Build.0
= Release|Any CPU
+ {F2B5BC7A-B273-4167-B794-13BE49C991D5}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {F2B5BC7A-B273-4167-B794-13BE49C991D5}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {F2B5BC7A-B273-4167-B794-13BE49C991D5}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {F2B5BC7A-B273-4167-B794-13BE49C991D5}.Release|Any CPU.Build.0
= Release|Any CPU
+ {CD8B7C6D-E1AA-4A1A-9566-6602FF2C14AC}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {CD8B7C6D-E1AA-4A1A-9566-6602FF2C14AC}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {CD8B7C6D-E1AA-4A1A-9566-6602FF2C14AC}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {CD8B7C6D-E1AA-4A1A-9566-6602FF2C14AC}.Release|Any CPU.Build.0
= Release|Any CPU
+ {07EA83ED-2FB0-4E08-A9EF-8A740DDB358A}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {07EA83ED-2FB0-4E08-A9EF-8A740DDB358A}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {07EA83ED-2FB0-4E08-A9EF-8A740DDB358A}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {07EA83ED-2FB0-4E08-A9EF-8A740DDB358A}.Release|Any CPU.Build.0
= Release|Any CPU
+ {0A09EEAA-DDAE-44D3-926C-88910D661901}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {0A09EEAA-DDAE-44D3-926C-88910D661901}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {0A09EEAA-DDAE-44D3-926C-88910D661901}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {0A09EEAA-DDAE-44D3-926C-88910D661901}.Release|Any CPU.Build.0
= Release|Any CPU
+ {FBA6F7E6-D92E-4E8F-81F0-625AE60F7E45}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {FBA6F7E6-D92E-4E8F-81F0-625AE60F7E45}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {FBA6F7E6-D92E-4E8F-81F0-625AE60F7E45}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {FBA6F7E6-D92E-4E8F-81F0-625AE60F7E45}.Release|Any CPU.Build.0
= Release|Any CPU
+ EndGlobalSection
+ GlobalSection(NestedProjects) = preSolution
+ {6CA0C169-109F-4A43-A21F-0792C1F865C8} =
{3576C181-8BD5-4845-9CDA-835B33A29A29}
+ {45A127B4-66FC-4EF2-9282-A0129A4CA83F} =
{3576C181-8BD5-4845-9CDA-835B33A29A29}
+ {3C1E95B5-59C2-43FC-9221-03489EE11B99} =
{3576C181-8BD5-4845-9CDA-835B33A29A29}
+ {D25FEFAD-7A4A-4617-91E3-1B0AB9737A7C} =
{45A127B4-66FC-4EF2-9282-A0129A4CA83F}
+ {5DBD2AE2-B68A-4072-8740-839EA19E400A} =
{45A127B4-66FC-4EF2-9282-A0129A4CA83F}
+ {2E3377E5-7193-4D61-9E24-3B43E73F930D} =
{3C1E95B5-59C2-43FC-9221-03489EE11B99}
+ {F2B5BC7A-B273-4167-B794-13BE49C991D5} =
{3C1E95B5-59C2-43FC-9221-03489EE11B99}
+ {BFFEC58A-0DA4-4360-8830-EE2A54027435} =
{3576C181-8BD5-4845-9CDA-835B33A29A29}
+ {CD8B7C6D-E1AA-4A1A-9566-6602FF2C14AC} =
{BFFEC58A-0DA4-4360-8830-EE2A54027435}
+ {07EA83ED-2FB0-4E08-A9EF-8A740DDB358A} =
{BFFEC58A-0DA4-4360-8830-EE2A54027435}
+ {578335B4-7B95-41D5-BB73-E965CAB2DBA4} =
{3576C181-8BD5-4845-9CDA-835B33A29A29}
+ {0A09EEAA-DDAE-44D3-926C-88910D661901} =
{578335B4-7B95-41D5-BB73-E965CAB2DBA4}
+ {FBA6F7E6-D92E-4E8F-81F0-625AE60F7E45} =
{578335B4-7B95-41D5-BB73-E965CAB2DBA4}
+ EndGlobalSection
+EndGlobal
+
diff --git a/examples/csharp/README.md b/examples/csharp/README.md
new file mode 100644
index 00000000..ea7d6c0b
--- /dev/null
+++ b/examples/csharp/README.md
@@ -0,0 +1,80 @@
+# Iggy Examples
+
+This directory contains comprehensive sample applications that showcase
various usage patterns of the Iggy client SDK. To learn more about building
applications with Iggy, please refer to the [getting
started](https://iggy.apache.org/docs/introduction/getting-started) guide.
+
+## Running Examples
+
+To run any example, first start the server with `cargo run --bin iggy-server`
and then run the desired example.
+
+For server configuration options and help:
+
+```bash
+cargo run --bin iggy-server -- --help
+```
+
+You can also customize the server using environment variables:
+
+```bash
+## Example: Enable HTTP transport and set custom address
+IGGY_HTTP_ENABLED=true IGGY_TCP_ADDRESS=0.0.0.0:8090 cargo run --bin
iggy-server
+```
+
+## Basic Examples
+
+### Getting Started
+
+Perfect introduction for newcomers to Iggy:
+
+```bash
+dotnet run --project
examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Producer
+dotnet run --project
examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Consumer
+```
+
+These examples use IIggyClient with TCP transport and demonstrate stream/topic
creation with basic message handling.
+
+### Basic Usage
+
+Core functionality with detailed configuration options:
+
+```bash
+dotnet run --project
examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer
+dotnet run --project
examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer
+```
+
+Demonstrates fundamental client connection, authentication, batch message
sending, and polling with support for TCP/QUIC/HTTP protocols.
+
+## Message Pattern Examples
+
+### Message Headers
+
+Shows metadata management using custom headers:
+
+```bash
+dotnet run --project
examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Producer
+dotnet run --project
examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Consumer
+```
+
+Demonstrates using HeaderKey/HeaderValue for message metadata instead of
payload-based typing, with header-based message routing.
+
+### Message Envelopes
+
+JSON envelope pattern for polymorphic message handling:
+
+```bash
+dotnet run --project
examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Producer
+dotnet run --project
examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Consumer
+```
+
+Uses MessagesGenerator to create OrderCreated, OrderConfirmed, and
OrderRejected messages wrapped in JSON envelopes for type identification.
+
+## Example Structure
+
+All examples can be executed directly from the repository. Follow these steps:
+
+1. **Start the Iggy server**: `cargo run --bin iggy-server`
+2. **Run desired example**: `dotnet run --project examples/csharp/src/xxx`
+3. **Check source code**
+
+These examples use IggyClient with TCP transport and demonstrate automatic
stream/topic creation with basic message handling.
+
+The examples are automatically tested via
`scripts/run-csharp-examples-from-readme.sh` to ensure they remain functional
and up-to-date with the latest API changes.
diff --git
a/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/Iggy_SDK.Examples.Basic.Consumer.csproj
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/Iggy_SDK.Examples.Basic.Consumer.csproj
new file mode 100644
index 00000000..d02cde82
--- /dev/null
+++
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/Iggy_SDK.Examples.Basic.Consumer.csproj
@@ -0,0 +1,24 @@
+<Project Sdk="Microsoft.NET.Sdk">
+ <PropertyGroup>
+ <OutputType>Exe</OutputType>
+ <TargetFramework>net8.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
+ <Nullable>enable</Nullable>
+ <WarningsAsErrors>true</WarningsAsErrors>
+ </PropertyGroup>
+ <ItemGroup>
+ <ProjectReference
Include="..\..\..\..\..\foreign\csharp\Iggy_SDK\Iggy_SDK.csproj"/>
+ <ProjectReference
Include="..\..\Iggy_SDK.Examples.Shared\Iggy_SDK.Examples.Shared.csproj"/>
+ </ItemGroup>
+ <ItemGroup>
+ <PackageReference Include="Microsoft.Extensions.Configuration"
Version="8.0.0"/>
+ <PackageReference Include="Microsoft.Extensions.Configuration.Binder"
Version="8.0.2"/>
+ <PackageReference Include="Microsoft.Extensions.Configuration.Json"
Version="8.0.1"/>
+ <PackageReference Include="Microsoft.Extensions.Logging.Console"
Version="8.0.1"/>
+ </ItemGroup>
+ <ItemGroup>
+ <None Update="appsettings.json">
+ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+ </None>
+ </ItemGroup>
+</Project>
diff --git
a/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/Program.cs
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/Program.cs
new file mode 100644
index 00000000..daef1f69
--- /dev/null
+++ b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/Program.cs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Text;
+using Apache.Iggy;
+using Apache.Iggy.Factory;
+using Apache.Iggy.Kinds;
+using Iggy_SDK.Examples.Basic.Consumer;
+using Iggy_SDK.Examples.Shared;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.Logging;
+
+var loggerFactory = LoggerFactory.Create(b => { b.AddConsole(); });
+var logger = loggerFactory.CreateLogger<Program>();
+
+var configuration = new
ConfigurationBuilder().AddJsonFile("appsettings.json").Build();
+var settings = configuration.Get<Settings>() ?? new Settings();
+
+logger.LogInformation(
+ "Basic consumer has started, selected protocol: {Protocol}",
+ settings.Protocol
+);
+
+var client = MessageStreamFactory.CreateMessageStream(
+ opt =>
+ {
+ opt.BaseAdress = settings.BaseAddress;
+ opt.Protocol = settings.Protocol;
+ },
+ loggerFactory
+);
+
+await client.LoginUser(settings.Username, settings.Password);
+
+logger.LogInformation("Basic consumer has logged on successfully");
+
+var streamId = Identifier.String(settings.StreamName);
+var topicId = Identifier.String(settings.TopicName);
+var partitionId = 1u;
+var consumerId = 1;
+
+await ExampleHelpers.EnsureStreamExists(client, streamId, settings.StreamName);
+await ExampleHelpers.EnsureTopicExists(
+ client,
+ streamId,
+ topicId,
+ settings.TopicName,
+ settings.PartitionsCount
+);
+
+var consumedBatches = 0;
+while (true)
+{
+ if (settings.MessageBatchesLimit > 0 && consumedBatches ==
settings.MessageBatchesLimit)
+ {
+ logger.LogInformation("Consumed {ConsumedBatches} batches of messages,
exiting.", consumedBatches);
+ break;
+ }
+
+ var response = await client.PollMessagesAsync(
+ streamId,
+ topicId,
+ partitionId,
+ Consumer.New(consumerId),
+ PollingStrategy.Next(),
+ 10,
+ true
+ );
+
+ foreach (var message in response.Messages)
+ logger.LogInformation(
+ "Handling message at offset: {Offset}, payload: {Payload}...",
+ message.Header.Offset,
+ Encoding.UTF8.GetString(message.Payload)
+ );
+
+ consumedBatches++;
+}
diff --git
a/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/Settings.cs
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/Settings.cs
new file mode 100644
index 00000000..fef7bafa
--- /dev/null
+++ b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/Settings.cs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Enums;
+
+namespace Iggy_SDK.Examples.Basic.Consumer;
+
+public sealed class Settings
+{
+ public Protocol Protocol { get; set; }
+ public string BaseAddress { get; set; } = null!;
+ public string Username { get; set; } = null!;
+ public string Password { get; set; } = null!;
+ public string StreamName { get; set; } = null!;
+ public string TopicName { get; set; } = null!;
+ public int MessageBatchesLimit { get; set; }
+ public uint PartitionsCount { get; set; }
+}
diff --git
a/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/appsettings.json
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/appsettings.json
new file mode 100644
index 00000000..9dbfa097
--- /dev/null
+++
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Consumer/appsettings.json
@@ -0,0 +1,10 @@
+{
+ "Protocol": "tcp",
+ "BaseAddress": "127.0.0.1:8090",
+ "Username": "iggy",
+ "Password": "iggy",
+ "StreamName": "basic-example-stream",
+ "TopicName": "basic-example-topic",
+ "MessageBatchesLimit": 10,
+ "PartitionsCount": 1
+}
diff --git
a/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/Iggy_SDK.Examples.Basic.Producer.csproj
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/Iggy_SDK.Examples.Basic.Producer.csproj
new file mode 100644
index 00000000..d02cde82
--- /dev/null
+++
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/Iggy_SDK.Examples.Basic.Producer.csproj
@@ -0,0 +1,24 @@
+<Project Sdk="Microsoft.NET.Sdk">
+ <PropertyGroup>
+ <OutputType>Exe</OutputType>
+ <TargetFramework>net8.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
+ <Nullable>enable</Nullable>
+ <WarningsAsErrors>true</WarningsAsErrors>
+ </PropertyGroup>
+ <ItemGroup>
+ <ProjectReference
Include="..\..\..\..\..\foreign\csharp\Iggy_SDK\Iggy_SDK.csproj"/>
+ <ProjectReference
Include="..\..\Iggy_SDK.Examples.Shared\Iggy_SDK.Examples.Shared.csproj"/>
+ </ItemGroup>
+ <ItemGroup>
+ <PackageReference Include="Microsoft.Extensions.Configuration"
Version="8.0.0"/>
+ <PackageReference Include="Microsoft.Extensions.Configuration.Binder"
Version="8.0.2"/>
+ <PackageReference Include="Microsoft.Extensions.Configuration.Json"
Version="8.0.1"/>
+ <PackageReference Include="Microsoft.Extensions.Logging.Console"
Version="8.0.1"/>
+ </ItemGroup>
+ <ItemGroup>
+ <None Update="appsettings.json">
+ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+ </None>
+ </ItemGroup>
+</Project>
diff --git
a/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/Program.cs
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/Program.cs
new file mode 100644
index 00000000..25276b5a
--- /dev/null
+++ b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/Program.cs
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Text;
+using Apache.Iggy;
+using Apache.Iggy.Contracts;
+using Apache.Iggy.Factory;
+using Apache.Iggy.Kinds;
+using Apache.Iggy.Messages;
+using Iggy_SDK.Examples.Basic.Producer;
+using Iggy_SDK.Examples.Shared;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.Logging;
+
+var loggerFactory = LoggerFactory.Create(b => { b.AddConsole(); });
+var logger = loggerFactory.CreateLogger<Program>();
+;
+var configuration = new
ConfigurationBuilder().AddJsonFile("appsettings.json").Build();
+var settings = configuration.Get<Settings>() ?? new Settings();
+
+logger.LogInformation(
+ "Basic producer has started, selected protocol: {Protocol}",
+ settings.Protocol
+);
+
+var client = MessageStreamFactory.CreateMessageStream(
+ opt =>
+ {
+ opt.BaseAdress = settings.BaseAddress;
+ opt.Protocol = settings.Protocol;
+ },
+ loggerFactory
+);
+
+await client.LoginUser(settings.Username, settings.Password);
+
+logger.LogInformation("Basic producer has logged on successfully");
+
+var streamId = Identifier.String(settings.StreamName);
+var topicId = Identifier.String(settings.TopicName);
+
+await ExampleHelpers.EnsureStreamExists(client, streamId, settings.StreamName);
+await ExampleHelpers.EnsureTopicExists(
+ client,
+ streamId,
+ topicId,
+ settings.TopicName,
+ settings.PartitionsCount
+);
+
+var sentBatches = 0;
+var currentId = 0;
+while (true)
+{
+ if (settings.MessageBatchesLimit > 0 && sentBatches ==
settings.MessageBatchesLimit)
+ {
+ logger.LogInformation("Sent {SentBatches} batches of messages,
exiting.", sentBatches);
+ break;
+ }
+
+ var payloads = Enumerable
+ .Range(currentId, settings.MessagesPerBatch)
+ .Aggregate(new List<string>(), (list, next) =>
+ {
+ list.Add($"message-{next}");
+ return list;
+ });
+
+ var messages = payloads.Select(payload => new Message(Guid.NewGuid(),
Encoding.UTF8.GetBytes(payload))).ToList();
+
+ await client.SendMessagesAsync(
+ new MessageSendRequest
+ {
+ StreamId = streamId,
+ TopicId = topicId,
+ Messages = messages,
+ Partitioning = Partitioning.None()
+ }
+ );
+
+ currentId += settings.MessagesPerBatch;
+ sentBatches++;
+ logger.LogInformation("Sent messages: {Messages}", payloads);
+
+ await Task.Delay(settings.Interval);
+}
diff --git
a/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/Settings.cs
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/Settings.cs
new file mode 100644
index 00000000..bc068218
--- /dev/null
+++ b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/Settings.cs
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Enums;
+
+namespace Iggy_SDK.Examples.Basic.Producer;
+
+public sealed class Settings
+{
+ public Protocol Protocol { get; set; }
+ public string BaseAddress { get; set; } = null!;
+ public string Username { get; set; } = null!;
+ public string Password { get; set; } = null!;
+ public string StreamName { get; set; } = null!;
+ public string TopicName { get; set; } = null!;
+ public int MessageBatchesLimit { get; set; }
+ public int MessagesPerBatch { get; set; }
+ public TimeSpan Interval { get; set; }
+ public uint PartitionsCount { get; set; }
+}
diff --git
a/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/appsettings.json
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/appsettings.json
new file mode 100644
index 00000000..cfe52e7d
--- /dev/null
+++
b/examples/csharp/src/Basic/Iggy_SDK.Examples.Basic.Producer/appsettings.json
@@ -0,0 +1,12 @@
+{
+ "Protocol": "tcp",
+ "BaseAddress": "127.0.0.1:8090",
+ "Username": "iggy",
+ "Password": "iggy",
+ "StreamName": "basic-example-stream",
+ "TopicName": "basic-example-topic",
+ "MessageBatchesLimit": 10,
+ "MessagesPerBatch": 10,
+ "Interval": "00:00:01",
+ "PartitionsCount": 1
+}
diff --git a/foreign/csharp/Iggy_Sample_Consumer/Iggy_Sample_Consumer.csproj
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Consumer/Iggy_SDK.Examples.GettingStarted.Consumer.csproj
similarity index 53%
copy from foreign/csharp/Iggy_Sample_Consumer/Iggy_Sample_Consumer.csproj
copy to
examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Consumer/Iggy_SDK.Examples.GettingStarted.Consumer.csproj
index 17c1043b..75778e9e 100644
--- a/foreign/csharp/Iggy_Sample_Consumer/Iggy_Sample_Consumer.csproj
+++
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Consumer/Iggy_SDK.Examples.GettingStarted.Consumer.csproj
@@ -1,21 +1,16 @@
-<Project Sdk="Microsoft.NET.Sdk">
-
+<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
- <AssemblyName>Apache.Iggy.Consumer</AssemblyName>
- <RootNamespace>Apache.Iggy.Consumer</RootNamespace>
+ <WarningsAsErrors>true</WarningsAsErrors>
</PropertyGroup>
-
<ItemGroup>
- <ProjectReference Include="..\Iggy_SDK\Iggy_SDK.csproj" />
- <ProjectReference Include="..\Shared\Shared.csproj" />
+ <ProjectReference
Include="..\..\..\..\..\foreign\csharp\Iggy_SDK\Iggy_SDK.csproj"/>
+ <ProjectReference
Include="..\..\Iggy_SDK.Examples.Shared\Iggy_SDK.Examples.Shared.csproj"/>
</ItemGroup>
-
<ItemGroup>
- <PackageReference Include="Microsoft.Extensions.Logging.Console" />
+ <PackageReference Include="Microsoft.Extensions.Logging.Console"
Version="8.0.1"/>
</ItemGroup>
-
</Project>
diff --git
a/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Consumer/Program.cs
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Consumer/Program.cs
new file mode 100644
index 00000000..b0f9b13c
--- /dev/null
+++
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Consumer/Program.cs
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Enums;
+using Apache.Iggy.Factory;
+using Iggy_SDK.Examples.GettingStarted.Consumer;
+using Microsoft.Extensions.Logging;
+
+var loggerFactory = LoggerFactory.Create(b => { b.AddConsole(); });
+var logger = loggerFactory.CreateLogger<Program>();
+
+var client = MessageStreamFactory.CreateMessageStream(
+ opt =>
+ {
+ opt.BaseAdress = Utils.GetTcpServerAddr(args, logger);
+ opt.Protocol = Protocol.Tcp;
+ },
+ loggerFactory
+);
+
+await client.LoginUser("iggy", "iggy");
+
+await Utils.ConsumeMessages(client, logger);
diff --git
a/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Consumer/Utils.cs
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Consumer/Utils.cs
new file mode 100644
index 00000000..4245edfb
--- /dev/null
+++
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Consumer/Utils.cs
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Net;
+using System.Text;
+using Apache.Iggy;
+using Apache.Iggy.Contracts;
+using Apache.Iggy.IggyClient;
+using Apache.Iggy.Kinds;
+using Microsoft.Extensions.Logging;
+
+namespace Iggy_SDK.Examples.GettingStarted.Consumer;
+
+public static class Utils
+{
+ private const uint STREAM_ID = 1;
+ private const uint TOPIC_ID = 2;
+ private const uint PARTITION_ID = 1;
+ private const uint BATCHES_LIMIT = 5;
+
+ public static async Task ConsumeMessages(IIggyClient client, ILogger
logger)
+ {
+ var interval = TimeSpan.FromMilliseconds(500);
+ logger.LogInformation(
+ "Messages will be consumed from stream: {StreamId}, topic:
{TopicId}, partition: {PartitionId} with interval {Interval}.",
+ STREAM_ID,
+ TOPIC_ID,
+ PARTITION_ID,
+ interval
+ );
+
+ var offset = 0ul;
+ var messagesPerBatch = 10;
+ var consumedBatches = 0;
+ var consumer = Apache.Iggy.Kinds.Consumer.New(1);
+ while (true)
+ {
+ if (consumedBatches == BATCHES_LIMIT)
+ {
+ logger.LogInformation(
+ "Consumed {ConsumedBatches} batches of messages, exiting.",
+ consumedBatches
+ );
+ return;
+ }
+
+ var streamIdentifier = Identifier.Numeric(STREAM_ID);
+ var topicIdentifier = Identifier.Numeric(TOPIC_ID);
+ var polledMessages = await client.PollMessagesAsync(
+ streamIdentifier,
+ topicIdentifier,
+ PARTITION_ID,
+ consumer,
+ PollingStrategy.Offset(offset),
+ messagesPerBatch,
+ false
+ );
+
+ if (!polledMessages.Messages.Any())
+ {
+ logger.LogInformation("No messages found.");
+ await Task.Delay(interval);
+ continue;
+ }
+
+ offset += (ulong)polledMessages.Messages.Count;
+ foreach (var message in polledMessages.Messages)
HandleMessage(message, logger);
+ consumedBatches++;
+ await Task.Delay(interval);
+ }
+ }
+
+ private static void HandleMessage(MessageResponse message, ILogger logger)
+ {
+ var payload = Encoding.UTF8.GetString(message.Payload);
+ logger.LogInformation(
+ "Handling message at offset: {Offset}, payload: {Payload}...",
+ message.Header.Offset,
+ payload
+ );
+ }
+
+ public static string GetTcpServerAddr(string[] args, ILogger logger)
+ {
+ var defaultServerAddr = "127.0.0.1:8090";
+ var argumentName = args.Length > 0 ? args[0] : null;
+ var tcpServerAddr = args.Length > 1 ? args[1] : null;
+
+ if (argumentName is null && tcpServerAddr is null) return
defaultServerAddr;
+
+ argumentName = argumentName ?? throw new
ArgumentNullException(argumentName);
+ if (argumentName != "--tcp-server-address")
+ throw new FormatException(
+ $"Invalid argument {argumentName}! Usage: --tcp-server-address
<server-address>"
+ );
+ tcpServerAddr = tcpServerAddr ?? throw new
ArgumentNullException(tcpServerAddr);
+ if (!IPEndPoint.TryParse(tcpServerAddr, out _))
+ throw new FormatException(
+ $"Invalid server address {tcpServerAddr}! Usage:
--tcp-server-address <server-address>"
+ );
+ logger.LogInformation("Using server address: {TcpServerAddr}",
tcpServerAddr);
+ return tcpServerAddr;
+ }
+}
diff --git a/foreign/csharp/Iggy_Sample_Producer/Iggy_Sample_Producer.csproj
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Producer/Iggy_SDK.Examples.GettingStarted.Producer.csproj
similarity index 53%
rename from foreign/csharp/Iggy_Sample_Producer/Iggy_Sample_Producer.csproj
rename to
examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Producer/Iggy_SDK.Examples.GettingStarted.Producer.csproj
index 5c027756..75778e9e 100644
--- a/foreign/csharp/Iggy_Sample_Producer/Iggy_Sample_Producer.csproj
+++
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Producer/Iggy_SDK.Examples.GettingStarted.Producer.csproj
@@ -1,21 +1,16 @@
-<Project Sdk="Microsoft.NET.Sdk">
-
+<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
- <AssemblyName>Apache.Iggy.Producer</AssemblyName>
- <RootNamespace>Apache.Iggy.Producer</RootNamespace>
+ <WarningsAsErrors>true</WarningsAsErrors>
</PropertyGroup>
-
<ItemGroup>
- <ProjectReference Include="..\Iggy_SDK\Iggy_SDK.csproj" />
- <ProjectReference Include="..\Shared\Shared.csproj" />
+ <ProjectReference
Include="..\..\..\..\..\foreign\csharp\Iggy_SDK\Iggy_SDK.csproj"/>
+ <ProjectReference
Include="..\..\Iggy_SDK.Examples.Shared\Iggy_SDK.Examples.Shared.csproj"/>
</ItemGroup>
-
<ItemGroup>
- <PackageReference Include="Microsoft.Extensions.Logging.Console" />
+ <PackageReference Include="Microsoft.Extensions.Logging.Console"
Version="8.0.1"/>
</ItemGroup>
-
</Project>
diff --git
a/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Producer/Program.cs
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Producer/Program.cs
new file mode 100644
index 00000000..8b629635
--- /dev/null
+++
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Producer/Program.cs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Enums;
+using Apache.Iggy.Factory;
+using Iggy_SDK.Examples.GettingStarted.Producer;
+using Microsoft.Extensions.Logging;
+
+var loggerFactory = LoggerFactory.Create(b => { b.AddConsole(); });
+var logger = loggerFactory.CreateLogger<Program>();
+
+var client = MessageStreamFactory.CreateMessageStream(
+ opt =>
+ {
+ opt.BaseAdress = Utils.GetTcpServerAddr(args, logger);
+ opt.Protocol = Protocol.Tcp;
+ },
+ loggerFactory
+);
+
+await client.LoginUser("iggy", "iggy");
+
+await Utils.InitSystem(client, logger);
+await Utils.ProduceMessages(client, logger);
diff --git
a/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Producer/Utils.cs
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Producer/Utils.cs
new file mode 100644
index 00000000..c1fe3f67
--- /dev/null
+++
b/examples/csharp/src/GettingStarted/Iggy_SDK.Examples.GettingStarted.Producer/Utils.cs
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Net;
+using System.Text;
+using Apache.Iggy;
+using Apache.Iggy.Contracts;
+using Apache.Iggy.Enums;
+using Apache.Iggy.Exceptions;
+using Apache.Iggy.IggyClient;
+using Apache.Iggy.Messages;
+using Microsoft.Extensions.Logging;
+using Partitioning = Apache.Iggy.Kinds.Partitioning;
+
+namespace Iggy_SDK.Examples.GettingStarted.Producer;
+
+public static class Utils
+{
+ private const uint STREAM_ID = 1;
+ private const uint TOPIC_ID = 2;
+ private const uint PARTITION_ID = 1;
+ private const uint BATCHES_LIMIT = 5;
+
+ public static async Task InitSystem(IIggyClient client, ILogger logger)
+ {
+ try
+ {
+ await client.CreateStreamAsync("getting-started-example-stream",
STREAM_ID);
+ logger.LogInformation("Stream was created.");
+ }
+ catch (InvalidResponseException)
+ {
+ logger.LogWarning("Stream already exists and will not be created
again.");
+ }
+
+ try
+ {
+ await client.CreateTopicAsync(
+ Identifier.Numeric(STREAM_ID),
+ "getting-started-example-topic",
+ 1,
+ CompressionAlgorithm.None,
+ TOPIC_ID
+ );
+ logger.LogInformation("Topic was created.");
+ }
+ catch (InvalidResponseException)
+ {
+ logger.LogWarning("Topic already exists and will not be created
again.");
+ }
+ }
+
+ public static async Task ProduceMessages(IIggyClient client, ILogger
logger)
+ {
+ var interval = TimeSpan.FromMilliseconds(500);
+ logger.LogInformation(
+ "Messages will be sent to stream: {StreamId}, topic: {TopicId},
partition: {PartitionId} with interval {Interval}.",
+ STREAM_ID,
+ TOPIC_ID,
+ PARTITION_ID,
+ interval
+ );
+
+ var currentId = 0;
+ var messagesPerBatch = 10;
+ var sentBatches = 0;
+ var partitioning = Partitioning.PartitionId((int)PARTITION_ID); //
should be uint
+ while (true)
+ {
+ if (sentBatches == BATCHES_LIMIT)
+ {
+ logger.LogInformation(
+ "Sent {SentBatches} batches of messages, exiting.",
+ sentBatches
+ );
+ return;
+ }
+
+ var payloads = Enumerable
+ .Range(currentId, messagesPerBatch)
+ .Aggregate(new List<string>(), (list, next) =>
+ {
+ list.Add($"message-{next}");
+ return list;
+ });
+
+ var messages = payloads.Select(payload => new
Message(Guid.NewGuid(), Encoding.UTF8.GetBytes(payload)))
+ .ToList();
+
+ var streamIdentifier = Identifier.Numeric(STREAM_ID);
+ var topicIdentifier = Identifier.Numeric(TOPIC_ID);
+ await client.SendMessagesAsync(
+ new MessageSendRequest
+ {
+ StreamId = streamIdentifier,
+ TopicId = topicIdentifier,
+ Partitioning = partitioning,
+ Messages = messages
+ }
+ );
+
+ currentId += messagesPerBatch;
+ sentBatches++;
+ logger.LogInformation("Sent messages: {Messages}.", payloads);
+
+ await Task.Delay(interval);
+ }
+ }
+
+ public static string GetTcpServerAddr(string[] args, ILogger logger)
+ {
+ var defaultServerAddr = "127.0.0.1:8090";
+ var argumentName = args.Length > 0 ? args[0] : null;
+ var tcpServerAddr = args.Length > 1 ? args[1] : null;
+
+ if (argumentName is null && tcpServerAddr is null) return
defaultServerAddr;
+
+ argumentName = argumentName ?? throw new
ArgumentNullException(argumentName);
+ if (argumentName != "--tcp-server-address")
+ throw new FormatException(
+ $"Invalid argument {argumentName}! Usage: --tcp-server-address
<server-address>"
+ );
+ tcpServerAddr = tcpServerAddr ?? throw new
ArgumentNullException(tcpServerAddr);
+ if (!IPEndPoint.TryParse(tcpServerAddr, out _))
+ throw new FormatException(
+ $"Invalid server address {tcpServerAddr}! Usage:
--tcp-server-address <server-address>"
+ );
+ logger.LogInformation("Using server address: {TcpServerAddr}",
tcpServerAddr);
+ return tcpServerAddr;
+ }
+}
diff --git a/examples/csharp/src/Iggy_SDK.Examples.Shared/ExampleHelpers.cs
b/examples/csharp/src/Iggy_SDK.Examples.Shared/ExampleHelpers.cs
new file mode 100644
index 00000000..cb9c5b29
--- /dev/null
+++ b/examples/csharp/src/Iggy_SDK.Examples.Shared/ExampleHelpers.cs
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy;
+using Apache.Iggy.Exceptions;
+using Apache.Iggy.IggyClient;
+
+namespace Iggy_SDK.Examples.Shared;
+
+public static class ExampleHelpers
+{
+ public static async Task EnsureStreamExists(
+ IIggyClient client,
+ Identifier streamId,
+ string streamName,
+ CancellationToken token = default
+ )
+ {
+ try
+ {
+ await client.GetStreamByIdAsync(streamId, token);
+ }
+ catch (InvalidResponseException)
+ {
+ await client.CreateStreamAsync(streamName, token: token);
+ }
+ }
+
+ public static async Task EnsureTopicExists(
+ IIggyClient client,
+ Identifier streamId,
+ Identifier topicId,
+ string topicName,
+ uint partitionsCount,
+ CancellationToken cancellationToken = default
+ )
+ {
+ try
+ {
+ await client.GetTopicByIdAsync(streamId, topicId,
cancellationToken);
+ }
+ catch (InvalidResponseException)
+ {
+ await client.CreateTopicAsync(
+ streamId,
+ topicName,
+ partitionsCount,
+ token: cancellationToken
+ );
+ }
+ }
+}
diff --git
a/examples/csharp/src/Iggy_SDK.Examples.Shared/Iggy_SDK.Examples.Shared.csproj
b/examples/csharp/src/Iggy_SDK.Examples.Shared/Iggy_SDK.Examples.Shared.csproj
new file mode 100644
index 00000000..c3679463
--- /dev/null
+++
b/examples/csharp/src/Iggy_SDK.Examples.Shared/Iggy_SDK.Examples.Shared.csproj
@@ -0,0 +1,11 @@
+<Project Sdk="Microsoft.NET.Sdk">
+ <PropertyGroup>
+ <TargetFramework>net8.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
+ <Nullable>enable</Nullable>
+ <WarningsAsErrors>true</WarningsAsErrors>
+ </PropertyGroup>
+ <ItemGroup>
+ <ProjectReference
Include="..\..\..\..\foreign\csharp\Iggy_SDK\Iggy_SDK.csproj"/>
+ </ItemGroup>
+</Project>
diff --git a/examples/csharp/src/Iggy_SDK.Examples.Shared/Messages.cs
b/examples/csharp/src/Iggy_SDK.Examples.Shared/Messages.cs
new file mode 100644
index 00000000..3807b10e
--- /dev/null
+++ b/examples/csharp/src/Iggy_SDK.Examples.Shared/Messages.cs
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Text.Json;
+using System.Text.Json.Serialization;
+
+namespace Iggy_SDK.Examples.Shared;
+
+public class Envelope
+{
+ public const string OrderCreatedType = "order_created";
+ public const string OrderConfirmedType = "order_confirmed";
+ public const string OrderRejectedType = "order_rejected";
+
+ public Envelope(string messageType, ISerializableMessage payload)
+ {
+ MessageType = messageType;
+ Payload = payload.ToJson();
+ }
+
+ [JsonConstructor]
+ public Envelope(string messageType, string payload)
+ {
+ MessageType = messageType;
+ Payload = payload;
+ }
+
+ public string MessageType { get; }
+ public string Payload { get; }
+
+ public string ToJson()
+ {
+ return JsonSerializer.Serialize(this);
+ }
+}
+
+public interface ISerializableMessage
+{
+ string MessageType { get; }
+ string ToJson();
+ string ToJsonEnvelope();
+}
+
+public record OrderCreated(
+ ulong OrderId,
+ string CurrencyPair,
+ double Price,
+ double Quantity,
+ string Side,
+ DateTimeOffset Timestamp) : ISerializableMessage
+{
+ public string MessageType => Envelope.OrderCreatedType;
+
+ public string ToJson()
+ {
+ return JsonSerializer.Serialize(this);
+ }
+
+ public string ToJsonEnvelope()
+ {
+ return new Envelope(MessageType, this).ToJson();
+ }
+}
+
+public record OrderConfirmed(
+ ulong OrderId,
+ double Price,
+ DateTimeOffset Timestamp
+) : ISerializableMessage
+{
+ public string MessageType => Envelope.OrderConfirmedType;
+
+ public string ToJson()
+ {
+ return JsonSerializer.Serialize(this);
+ }
+
+ public string ToJsonEnvelope()
+ {
+ return new Envelope(MessageType, this).ToJson();
+ }
+}
+
+public record OrderRejected(
+ ulong OrderId,
+ DateTimeOffset Timestamp,
+ string Reason
+) : ISerializableMessage
+{
+ public string MessageType => Envelope.OrderRejectedType;
+
+ public string ToJson()
+ {
+ return JsonSerializer.Serialize(this);
+ }
+
+ public string ToJsonEnvelope()
+ {
+ return new Envelope(MessageType, this).ToJson();
+ }
+}
diff --git a/examples/csharp/src/Iggy_SDK.Examples.Shared/MessagesGenerator.cs
b/examples/csharp/src/Iggy_SDK.Examples.Shared/MessagesGenerator.cs
new file mode 100644
index 00000000..438ed96c
--- /dev/null
+++ b/examples/csharp/src/Iggy_SDK.Examples.Shared/MessagesGenerator.cs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Diagnostics;
+
+namespace Iggy_SDK.Examples.Shared;
+
+public class MessagesGenerator
+{
+ private ulong _orderId;
+
+ private static string[] CurrencyPairs => ["EUR/USD", "EUR/GBP", "USD/GBP",
"EUR/PLN", "USD/PLN"];
+
+ public ISerializableMessage Generate()
+ {
+ return (Random.Shared.Next() % 3) switch
+ {
+ 0 => GenerateOrderCreated(),
+ 1 => GenerateOrderConfirmed(),
+ 2 => GenerateOrderRejected(),
+ _ => throw new UnreachableException()
+ };
+ }
+
+ private OrderCreated GenerateOrderCreated()
+ {
+ _orderId++;
+ return new OrderCreated(
+ _orderId,
+ CurrencyPairs[Random.Shared.Next(0, CurrencyPairs.Length)],
+ Random.Shared.NextDouble() * 990.0 + 10.0,
+ Random.Shared.NextDouble() * 0.9 + 0.1,
+ Random.Shared.Next() % 2 == 1 ? "buy" : "sell",
+ DateTimeOffset.UtcNow
+ );
+ }
+
+ private OrderConfirmed GenerateOrderConfirmed()
+ {
+ return new OrderConfirmed(
+ _orderId,
+ Random.Shared.NextDouble() * 990.0 + 10.0,
+ DateTimeOffset.UtcNow
+ );
+ }
+
+ private OrderRejected GenerateOrderRejected()
+ {
+ return new OrderRejected(
+ _orderId,
+ DateTimeOffset.UtcNow,
+ Random.Shared.Next() % 2 == 1 ? "cancelled_by_user" : "other"
+ );
+ }
+}
diff --git
a/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Consumer/Iggy_SDK.Examples.MessageEnvelope.Consumer.csproj
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Consumer/Iggy_SDK.Examples.MessageEnvelope.Consumer.csproj
new file mode 100644
index 00000000..17ada8c9
--- /dev/null
+++
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Consumer/Iggy_SDK.Examples.MessageEnvelope.Consumer.csproj
@@ -0,0 +1,19 @@
+<Project Sdk="Microsoft.NET.Sdk">
+ <PropertyGroup>
+ <OutputType>Exe</OutputType>
+ <TargetFramework>net8.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
+ <Nullable>enable</Nullable>
+ <WarningsAsErrors>true</WarningsAsErrors>
+ </PropertyGroup>
+ <ItemGroup>
+ <ProjectReference
Include="..\..\..\..\..\foreign\csharp\Iggy_SDK\Iggy_SDK.csproj"/>
+ <ProjectReference
Include="..\..\Iggy_SDK.Examples.Shared\Iggy_SDK.Examples.Shared.csproj"/>
+ </ItemGroup>
+ <ItemGroup>
+ <PackageReference Include="Microsoft.Extensions.Configuration"
Version="8.0.0"/>
+ <PackageReference Include="Microsoft.Extensions.Configuration.Binder"
Version="8.0.2"/>
+ <PackageReference Include="Microsoft.Extensions.Configuration.Json"
Version="8.0.1"/>
+ <PackageReference Include="Microsoft.Extensions.Logging.Console"
Version="8.0.1"/>
+ </ItemGroup>
+</Project>
diff --git
a/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Consumer/Program.cs
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Consumer/Program.cs
new file mode 100644
index 00000000..c55e6488
--- /dev/null
+++
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Consumer/Program.cs
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Enums;
+using Apache.Iggy.Factory;
+using Iggy_SDK.Examples.MessageEnvelope.Consumer;
+using Microsoft.Extensions.Logging;
+
+var loggerFactory = LoggerFactory.Create(b => { b.AddConsole(); });
+var logger = loggerFactory.CreateLogger<Program>();
+
+var client = MessageStreamFactory.CreateMessageStream(
+ opt =>
+ {
+ opt.BaseAdress = Utils.GetTcpServerAddr(args, logger);
+ opt.Protocol = Protocol.Tcp;
+ },
+ loggerFactory
+);
+
+await client.LoginUser("iggy", "iggy");
+
+await Utils.ConsumeMessages(client, logger);
diff --git
a/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Consumer/Utils.cs
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Consumer/Utils.cs
new file mode 100644
index 00000000..908c2a18
--- /dev/null
+++
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Consumer/Utils.cs
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Net;
+using System.Text;
+using System.Text.Json;
+using Apache.Iggy;
+using Apache.Iggy.Contracts;
+using Apache.Iggy.IggyClient;
+using Apache.Iggy.Kinds;
+using Iggy_SDK.Examples.Shared;
+using Microsoft.Extensions.Logging;
+
+namespace Iggy_SDK.Examples.MessageEnvelope.Consumer;
+
+public static class Utils
+{
+ private const uint STREAM_ID = 1;
+ private const uint TOPIC_ID = 4;
+ private const uint PARTITION_ID = 1;
+ private const uint BATCHES_LIMIT = 5;
+
+ public static async Task ConsumeMessages(IIggyClient client, ILogger
logger)
+ {
+ var interval = TimeSpan.FromMilliseconds(500);
+ logger.LogInformation(
+ "Messages will be consumed from stream: {StreamId}, topic:
{TopicId}, partition: {PartitionId} with interval {Interval}.",
+ STREAM_ID,
+ TOPIC_ID,
+ PARTITION_ID,
+ interval
+ );
+
+ var offset = 0ul;
+ var messagesPerBatch = 10;
+ var consumedBatches = 0;
+ var consumer = Apache.Iggy.Kinds.Consumer.New(1);
+ while (true)
+ {
+ if (consumedBatches == BATCHES_LIMIT)
+ {
+ logger.LogInformation(
+ "Consumed {ConsumedBatches} batches of messages, exiting.",
+ consumedBatches
+ );
+ return;
+ }
+
+ var streamIdentifier = Identifier.Numeric(STREAM_ID);
+ var topicIdentifier = Identifier.Numeric(TOPIC_ID);
+ var polledMessages = await client.PollMessagesAsync(
+ streamIdentifier,
+ topicIdentifier,
+ PARTITION_ID,
+ consumer,
+ PollingStrategy.Offset(offset),
+ messagesPerBatch,
+ false
+ );
+
+ if (!polledMessages.Messages.Any())
+ {
+ logger.LogInformation("No messages found.");
+ await Task.Delay(interval);
+ continue;
+ }
+
+ offset += (ulong)polledMessages.Messages.Count;
+ foreach (var message in polledMessages.Messages)
HandleMessage(message, logger);
+ consumedBatches++;
+ await Task.Delay(interval);
+ }
+ }
+
+ private static void HandleMessage(MessageResponse message, ILogger logger)
+ {
+ var payload = Encoding.UTF8.GetString(message.Payload);
+ var envelope = JsonSerializer.Deserialize<Envelope>(payload) ??
+ throw new Exception("Could not deserialize envelope.");
+
+ logger.LogInformation(
+ "Handling message type: {MessageType} at offset: {Offset}...",
+ envelope.MessageType,
+ message.Header.Offset
+ );
+
+ switch (envelope.MessageType)
+ {
+ case Envelope.OrderCreatedType:
+ var orderCreated =
JsonSerializer.Deserialize<OrderCreated>(envelope.Payload) ??
+ throw new Exception("Could not deserialize
order_created.");
+ logger.LogInformation("{OrderCreated}", orderCreated);
+ break;
+
+ case Envelope.OrderConfirmedType:
+ var orderConfirmed =
JsonSerializer.Deserialize<OrderConfirmed>(envelope.Payload) ??
+ throw new Exception("Could not
deserialize order_confirmed.");
+ logger.LogInformation("{OrderConfirmed}", orderConfirmed);
+ break;
+ case Envelope.OrderRejectedType:
+ var orderRejected =
JsonSerializer.Deserialize<OrderRejected>(envelope.Payload) ??
+ throw new Exception("Could not deserialize
order_rejected.");
+ logger.LogInformation("{OrderRejected}", orderRejected);
+ break;
+ default:
+ logger.LogWarning("Received unknown message type:
{MessageType}", envelope.MessageType);
+ break;
+ }
+ }
+
+ public static string GetTcpServerAddr(string[] args, ILogger logger)
+ {
+ var defaultServerAddr = "127.0.0.1:8090";
+ var argumentName = args.Length > 0 ? args[0] : null;
+ var tcpServerAddr = args.Length > 1 ? args[1] : null;
+
+ if (argumentName is null && tcpServerAddr is null) return
defaultServerAddr;
+
+ argumentName = argumentName ?? throw new
ArgumentNullException(argumentName);
+ if (argumentName != "--tcp-server-address")
+ throw new FormatException(
+ $"Invalid argument {argumentName}! Usage: --tcp-server-address
<server-address>"
+ );
+ tcpServerAddr = tcpServerAddr ?? throw new
ArgumentNullException(tcpServerAddr);
+ if (!IPEndPoint.TryParse(tcpServerAddr, out _))
+ throw new FormatException(
+ $"Invalid server address {tcpServerAddr}! Usage:
--tcp-server-address <server-address>"
+ );
+ logger.LogInformation("Using server address: {TcpServerAddr}",
tcpServerAddr);
+ return tcpServerAddr;
+ }
+}
diff --git
a/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Producer/Iggy_SDK.Examples.MessageEnvelope.Producer.csproj
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Producer/Iggy_SDK.Examples.MessageEnvelope.Producer.csproj
new file mode 100644
index 00000000..17ada8c9
--- /dev/null
+++
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Producer/Iggy_SDK.Examples.MessageEnvelope.Producer.csproj
@@ -0,0 +1,19 @@
+<Project Sdk="Microsoft.NET.Sdk">
+ <PropertyGroup>
+ <OutputType>Exe</OutputType>
+ <TargetFramework>net8.0</TargetFramework>
+ <ImplicitUsings>enable</ImplicitUsings>
+ <Nullable>enable</Nullable>
+ <WarningsAsErrors>true</WarningsAsErrors>
+ </PropertyGroup>
+ <ItemGroup>
+ <ProjectReference
Include="..\..\..\..\..\foreign\csharp\Iggy_SDK\Iggy_SDK.csproj"/>
+ <ProjectReference
Include="..\..\Iggy_SDK.Examples.Shared\Iggy_SDK.Examples.Shared.csproj"/>
+ </ItemGroup>
+ <ItemGroup>
+ <PackageReference Include="Microsoft.Extensions.Configuration"
Version="8.0.0"/>
+ <PackageReference Include="Microsoft.Extensions.Configuration.Binder"
Version="8.0.2"/>
+ <PackageReference Include="Microsoft.Extensions.Configuration.Json"
Version="8.0.1"/>
+ <PackageReference Include="Microsoft.Extensions.Logging.Console"
Version="8.0.1"/>
+ </ItemGroup>
+</Project>
diff --git
a/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Producer/Program.cs
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Producer/Program.cs
new file mode 100644
index 00000000..193fc81c
--- /dev/null
+++
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Producer/Program.cs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Enums;
+using Apache.Iggy.Factory;
+using Iggy_SDK.Examples.MessageEnvelope.Producer;
+using Microsoft.Extensions.Logging;
+
+var loggerFactory = LoggerFactory.Create(b => { b.AddConsole(); });
+var logger = loggerFactory.CreateLogger<Program>();
+
+var client = MessageStreamFactory.CreateMessageStream(
+ opt =>
+ {
+ opt.BaseAdress = Utils.GetTcpServerAddr(args, logger);
+ opt.Protocol = Protocol.Tcp;
+ },
+ loggerFactory
+);
+
+await client.LoginUser("iggy", "iggy");
+
+await Utils.InitSystem(client, logger);
+await Utils.ProduceMessages(client, logger);
diff --git
a/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Producer/Utils.cs
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Producer/Utils.cs
new file mode 100644
index 00000000..82f2c23c
--- /dev/null
+++
b/examples/csharp/src/MessageEnvelope/Iggy_SDK.Examples.MessageEnvelope.Producer/Utils.cs
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Net;
+using System.Text;
+using Apache.Iggy;
+using Apache.Iggy.Contracts;
+using Apache.Iggy.Enums;
+using Apache.Iggy.Exceptions;
+using Apache.Iggy.IggyClient;
+using Apache.Iggy.Messages;
+using Iggy_SDK.Examples.Shared;
+using Microsoft.Extensions.Logging;
+using Partitioning = Apache.Iggy.Kinds.Partitioning;
+
+namespace Iggy_SDK.Examples.MessageEnvelope.Producer;
+
+public static class Utils
+{
+ private const uint STREAM_ID = 1;
+ private const uint TOPIC_ID = 4;
+ private const uint PARTITION_ID = 1;
+ private const uint BATCHES_LIMIT = 5;
+
+ public static async Task InitSystem(IIggyClient client, ILogger logger)
+ {
+ try
+ {
+ await client.CreateStreamAsync("message-envelope-example-stream",
STREAM_ID);
+ logger.LogInformation("Stream was created.");
+ }
+ catch (InvalidResponseException)
+ {
+ logger.LogWarning("Stream already exists and will not be created
again.");
+ }
+
+ try
+ {
+ await client.CreateTopicAsync(
+ Identifier.Numeric(STREAM_ID),
+ "message-envelope-example-topic",
+ 1,
+ CompressionAlgorithm.None,
+ TOPIC_ID
+ );
+ logger.LogInformation("Topic was created.");
+ }
+ catch (InvalidResponseException)
+ {
+ logger.LogWarning("Topic already exists and will not be created
again.");
+ }
+ }
+
+ public static async Task ProduceMessages(IIggyClient client, ILogger
logger)
+ {
+ var interval = TimeSpan.FromMilliseconds(500);
+ logger.LogInformation(
+ "Messages will be sent to stream: {StreamId}, topic: {TopicId},
partition: {PartitionId} with interval {Interval}.",
+ STREAM_ID,
+ TOPIC_ID,
+ PARTITION_ID,
+ interval
+ );
+
+ var messagesPerBatch = 10;
+ var sentBatches = 0;
+ var messagesGenerator = new MessagesGenerator();
+ var partitioning = Partitioning.PartitionId((int)PARTITION_ID);
+
+ while (true)
+ {
+ if (sentBatches == BATCHES_LIMIT)
+ {
+ logger.LogInformation(
+ "Sent {SentBatches} batches of messages, exiting.",
+ sentBatches
+ );
+ return;
+ }
+
+ var serializableMessages = Enumerable
+ .Range(0, messagesPerBatch)
+ .Aggregate(new List<ISerializableMessage>(), (list, _) =>
+ {
+ var serializableMessage = messagesGenerator.Generate();
+ list.Add(serializableMessage);
+ return list;
+ });
+
+ var messages = serializableMessages.Select(serializableMessage =>
+ {
+ var jsonEnvelope = serializableMessage.ToJsonEnvelope();
+ return new Message(Guid.NewGuid(),
Encoding.UTF8.GetBytes(jsonEnvelope));
+ }
+ ).ToList();
+
+ var streamIdentifier = Identifier.Numeric(STREAM_ID);
+ var topicIdentifier = Identifier.Numeric(TOPIC_ID);
+ logger.LogInformation("Sending messages count: {Count}",
messagesPerBatch);
+
+ await client.SendMessagesAsync(
+ new MessageSendRequest
+ {
+ StreamId = streamIdentifier,
+ TopicId = topicIdentifier,
+ Partitioning = partitioning,
+ Messages = messages
+ }
+ );
+
+ sentBatches++;
+ logger.LogInformation("Sent messages: {Messages}.",
serializableMessages);
+
+ await Task.Delay(interval);
+ }
+ }
+
+ public static string GetTcpServerAddr(string[] args, ILogger logger)
+ {
+ var defaultServerAddr = "127.0.0.1:8090";
+ var argumentName = args.Length > 0 ? args[0] : null;
+ var tcpServerAddr = args.Length > 1 ? args[1] : null;
+
+ if (argumentName is null && tcpServerAddr is null) return
defaultServerAddr;
+
+ argumentName = argumentName ?? throw new
ArgumentNullException(argumentName);
+ if (argumentName != "--tcp-server-address")
+ throw new FormatException(
+ $"Invalid argument {argumentName}! Usage: --tcp-server-address
<server-address>"
+ );
+ tcpServerAddr = tcpServerAddr ?? throw new
ArgumentNullException(tcpServerAddr);
+ if (!IPEndPoint.TryParse(tcpServerAddr, out _))
+ throw new FormatException(
+ $"Invalid server address {tcpServerAddr}! Usage:
--tcp-server-address <server-address>"
+ );
+ logger.LogInformation("Using server address: {TcpServerAddr}",
tcpServerAddr);
+ return tcpServerAddr;
+ }
+}
diff --git a/foreign/csharp/Iggy_Sample_Consumer/Iggy_Sample_Consumer.csproj
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Consumer/Iggy_SDK.Examples.MessageHeaders.Consumer.csproj
similarity index 53%
copy from foreign/csharp/Iggy_Sample_Consumer/Iggy_Sample_Consumer.csproj
copy to
examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Consumer/Iggy_SDK.Examples.MessageHeaders.Consumer.csproj
index 17c1043b..75778e9e 100644
--- a/foreign/csharp/Iggy_Sample_Consumer/Iggy_Sample_Consumer.csproj
+++
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Consumer/Iggy_SDK.Examples.MessageHeaders.Consumer.csproj
@@ -1,21 +1,16 @@
-<Project Sdk="Microsoft.NET.Sdk">
-
+<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
- <AssemblyName>Apache.Iggy.Consumer</AssemblyName>
- <RootNamespace>Apache.Iggy.Consumer</RootNamespace>
+ <WarningsAsErrors>true</WarningsAsErrors>
</PropertyGroup>
-
<ItemGroup>
- <ProjectReference Include="..\Iggy_SDK\Iggy_SDK.csproj" />
- <ProjectReference Include="..\Shared\Shared.csproj" />
+ <ProjectReference
Include="..\..\..\..\..\foreign\csharp\Iggy_SDK\Iggy_SDK.csproj"/>
+ <ProjectReference
Include="..\..\Iggy_SDK.Examples.Shared\Iggy_SDK.Examples.Shared.csproj"/>
</ItemGroup>
-
<ItemGroup>
- <PackageReference Include="Microsoft.Extensions.Logging.Console" />
+ <PackageReference Include="Microsoft.Extensions.Logging.Console"
Version="8.0.1"/>
</ItemGroup>
-
</Project>
diff --git
a/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Consumer/Program.cs
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Consumer/Program.cs
new file mode 100644
index 00000000..f53d547a
--- /dev/null
+++
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Consumer/Program.cs
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Enums;
+using Apache.Iggy.Factory;
+using Iggy_SDK.Examples.MessageHeaders.Consumer;
+using Microsoft.Extensions.Logging;
+
+var loggerFactory = LoggerFactory.Create(b => { b.AddConsole(); });
+var logger = loggerFactory.CreateLogger<Program>();
+
+var client = MessageStreamFactory.CreateMessageStream(
+ opt =>
+ {
+ opt.BaseAdress = Utils.GetTcpServerAddr(args, logger);
+ opt.Protocol = Protocol.Tcp;
+ },
+ loggerFactory
+);
+
+await client.LoginUser("iggy", "iggy");
+
+await Utils.ConsumeMessages(client, logger);
diff --git
a/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Consumer/Utils.cs
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Consumer/Utils.cs
new file mode 100644
index 00000000..770d11b1
--- /dev/null
+++
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Consumer/Utils.cs
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Net;
+using System.Text;
+using System.Text.Json;
+using Apache.Iggy;
+using Apache.Iggy.Contracts;
+using Apache.Iggy.Headers;
+using Apache.Iggy.IggyClient;
+using Apache.Iggy.Kinds;
+using Iggy_SDK.Examples.Shared;
+using Microsoft.Extensions.Logging;
+
+namespace Iggy_SDK.Examples.MessageHeaders.Consumer;
+
+public static class Utils
+{
+ private const uint STREAM_ID = 1;
+ private const uint TOPIC_ID = 3;
+ private const uint PARTITION_ID = 1;
+ private const uint BATCHES_LIMIT = 5;
+
+ public static async Task ConsumeMessages(IIggyClient client, ILogger
logger)
+ {
+ var interval = TimeSpan.FromMilliseconds(500);
+ logger.LogInformation(
+ "Messages will be consumed from stream: {StreamId}, topic:
{TopicId}, partition: {PartitionId} with interval {Interval}.",
+ STREAM_ID,
+ TOPIC_ID,
+ PARTITION_ID,
+ interval
+ );
+
+ var offset = 0ul;
+ var messagesPerBatch = 10;
+ var consumedBatches = 0;
+ var consumer = Apache.Iggy.Kinds.Consumer.New(1);
+ while (true)
+ {
+ if (consumedBatches == BATCHES_LIMIT)
+ {
+ logger.LogInformation(
+ "Consumed {ConsumedBatches} batches of messages, exiting.",
+ consumedBatches
+ );
+ return;
+ }
+
+ var streamIdentifier = Identifier.Numeric(STREAM_ID);
+ var topicIdentifier = Identifier.Numeric(TOPIC_ID);
+ var polledMessages = await client.PollMessagesAsync(
+ streamIdentifier,
+ topicIdentifier,
+ PARTITION_ID,
+ consumer,
+ PollingStrategy.Offset(offset),
+ messagesPerBatch,
+ false
+ );
+
+ if (!polledMessages.Messages.Any())
+ {
+ logger.LogInformation("No messages found.");
+ await Task.Delay(interval);
+ continue;
+ }
+
+ offset += (ulong)polledMessages.Messages.Count;
+ foreach (var message in polledMessages.Messages)
HandleMessage(message, logger);
+ consumedBatches++;
+ await Task.Delay(interval);
+ }
+ }
+
+ private static void HandleMessage(MessageResponse message, ILogger logger)
+ {
+ var headerKey = HeaderKey.New("message_type");
+ var headersMap = message.UserHeaders ?? throw new Exception("Missing
headers map.");
+ var messageType = Encoding.UTF8.GetString(headersMap[headerKey].Value);
+
+ logger.LogInformation(
+ "Handling message type: {MessageType} at offset: {Offset}...",
+ messageType,
+ message.Header.Offset
+ );
+
+ switch (messageType)
+ {
+ case Envelope.OrderCreatedType:
+ var orderCreated =
JsonSerializer.Deserialize<OrderCreated>(message.Payload) ??
+ throw new Exception("Could not deserialize
order_created.");
+ logger.LogInformation("{OrderCreated}", orderCreated);
+ break;
+
+ case Envelope.OrderConfirmedType:
+ var orderConfirmed =
JsonSerializer.Deserialize<OrderConfirmed>(message.Payload) ??
+ throw new Exception("Could not
deserialize order_confirmed.");
+ logger.LogInformation("{OrderConfirmed}", orderConfirmed);
+ break;
+ case Envelope.OrderRejectedType:
+ var orderRejected =
JsonSerializer.Deserialize<OrderRejected>(message.Payload) ??
+ throw new Exception("Could not deserialize
order_rejected.");
+ logger.LogInformation("{OrderRejected}", orderRejected);
+ break;
+ default:
+ logger.LogWarning("Received unknown message type:
{MessageType}", messageType);
+ break;
+ }
+ }
+
+ public static string GetTcpServerAddr(string[] args, ILogger logger)
+ {
+ var defaultServerAddr = "127.0.0.1:8090";
+ var argumentName = args.Length > 0 ? args[0] : null;
+ var tcpServerAddr = args.Length > 1 ? args[1] : null;
+
+ if (argumentName is null && tcpServerAddr is null) return
defaultServerAddr;
+
+ argumentName = argumentName ?? throw new
ArgumentNullException(argumentName);
+ if (argumentName != "--tcp-server-address")
+ throw new FormatException(
+ $"Invalid argument {argumentName}! Usage: --tcp-server-address
<server-address>"
+ );
+ tcpServerAddr = tcpServerAddr ?? throw new
ArgumentNullException(tcpServerAddr);
+ if (!IPEndPoint.TryParse(tcpServerAddr, out _))
+ throw new FormatException(
+ $"Invalid server address {tcpServerAddr}! Usage:
--tcp-server-address <server-address>"
+ );
+ logger.LogInformation("Using server address: {TcpServerAddr}",
tcpServerAddr);
+ return tcpServerAddr;
+ }
+}
diff --git a/foreign/csharp/Iggy_Sample_Consumer/Iggy_Sample_Consumer.csproj
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Producer/Iggy_SDK.Examples.MessageHeaders.Producer.csproj
similarity index 53%
rename from foreign/csharp/Iggy_Sample_Consumer/Iggy_Sample_Consumer.csproj
rename to
examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Producer/Iggy_SDK.Examples.MessageHeaders.Producer.csproj
index 17c1043b..75778e9e 100644
--- a/foreign/csharp/Iggy_Sample_Consumer/Iggy_Sample_Consumer.csproj
+++
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Producer/Iggy_SDK.Examples.MessageHeaders.Producer.csproj
@@ -1,21 +1,16 @@
-<Project Sdk="Microsoft.NET.Sdk">
-
+<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
- <AssemblyName>Apache.Iggy.Consumer</AssemblyName>
- <RootNamespace>Apache.Iggy.Consumer</RootNamespace>
+ <WarningsAsErrors>true</WarningsAsErrors>
</PropertyGroup>
-
<ItemGroup>
- <ProjectReference Include="..\Iggy_SDK\Iggy_SDK.csproj" />
- <ProjectReference Include="..\Shared\Shared.csproj" />
+ <ProjectReference
Include="..\..\..\..\..\foreign\csharp\Iggy_SDK\Iggy_SDK.csproj"/>
+ <ProjectReference
Include="..\..\Iggy_SDK.Examples.Shared\Iggy_SDK.Examples.Shared.csproj"/>
</ItemGroup>
-
<ItemGroup>
- <PackageReference Include="Microsoft.Extensions.Logging.Console" />
+ <PackageReference Include="Microsoft.Extensions.Logging.Console"
Version="8.0.1"/>
</ItemGroup>
-
</Project>
diff --git
a/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Producer/Program.cs
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Producer/Program.cs
new file mode 100644
index 00000000..f265e74a
--- /dev/null
+++
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Producer/Program.cs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Apache.Iggy.Enums;
+using Apache.Iggy.Factory;
+using Iggy_SDK.Examples.MessageHeaders.Producer;
+using Microsoft.Extensions.Logging;
+
+var loggerFactory = LoggerFactory.Create(b => { b.AddConsole(); });
+var logger = loggerFactory.CreateLogger<Program>();
+
+var client = MessageStreamFactory.CreateMessageStream(
+ opt =>
+ {
+ opt.BaseAdress = Utils.GetTcpServerAddr(args, logger);
+ opt.Protocol = Protocol.Tcp;
+ },
+ loggerFactory
+);
+
+await client.LoginUser("iggy", "iggy");
+
+await Utils.InitSystem(client, logger);
+await Utils.ProduceMessages(client, logger);
diff --git
a/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Producer/Utils.cs
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Producer/Utils.cs
new file mode 100644
index 00000000..0b316468
--- /dev/null
+++
b/examples/csharp/src/MessageHeaders/Iggy_SDK.Examples.MessageHeaders.Producer/Utils.cs
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Net;
+using System.Text;
+using Apache.Iggy;
+using Apache.Iggy.Contracts;
+using Apache.Iggy.Enums;
+using Apache.Iggy.Exceptions;
+using Apache.Iggy.Headers;
+using Apache.Iggy.IggyClient;
+using Apache.Iggy.Messages;
+using Iggy_SDK.Examples.Shared;
+using Microsoft.Extensions.Logging;
+using Partitioning = Apache.Iggy.Kinds.Partitioning;
+
+namespace Iggy_SDK.Examples.MessageHeaders.Producer;
+
+public static class Utils
+{
+ private const uint STREAM_ID = 1;
+ private const uint TOPIC_ID = 3;
+ private const uint PARTITION_ID = 1;
+ private const uint BATCHES_LIMIT = 5;
+
+ public static async Task InitSystem(IIggyClient client, ILogger logger)
+ {
+ try
+ {
+ await client.CreateStreamAsync("message-headers-example-stream",
STREAM_ID);
+ logger.LogInformation("Stream was created.");
+ }
+ catch (InvalidResponseException)
+ {
+ logger.LogWarning("Stream already exists and will not be created
again.");
+ }
+
+ try
+ {
+ await client.CreateTopicAsync(
+ Identifier.Numeric(STREAM_ID),
+ "message-headers-example-topic",
+ 1,
+ CompressionAlgorithm.None,
+ TOPIC_ID
+ );
+ logger.LogInformation("Topic was created.");
+ }
+ catch (InvalidResponseException)
+ {
+ logger.LogWarning("Topic already exists and will not be created
again.");
+ }
+ }
+
+ public static async Task ProduceMessages(IIggyClient client, ILogger
logger)
+ {
+ var interval = TimeSpan.FromMilliseconds(500);
+ logger.LogInformation(
+ "Messages will be sent to stream: {StreamId}, topic: {TopicId},
partition: {PartitionId} with interval {Interval}.",
+ STREAM_ID,
+ TOPIC_ID,
+ PARTITION_ID,
+ interval
+ );
+
+ var messagesPerBatch = 10;
+ var sentBatches = 0;
+ var messagesGenerator = new MessagesGenerator();
+ var partitioning = Partitioning.PartitionId((int)PARTITION_ID);
+
+ while (true)
+ {
+ if (sentBatches == BATCHES_LIMIT)
+ {
+ logger.LogInformation(
+ "Sent {SentBatches} batches of messages, exiting.",
+ sentBatches
+ );
+ return;
+ }
+
+ var serializableMessages = Enumerable
+ .Range(0, messagesPerBatch)
+ .Aggregate(new List<ISerializableMessage>(), (list, _) =>
+ {
+ var serializableMessage = messagesGenerator.Generate();
+ list.Add(serializableMessage);
+ return list;
+ });
+
+ var messages = serializableMessages.Select(serializableMessage =>
+ {
+ var jsonEnvelope = serializableMessage.ToJson();
+ return new Message(Guid.NewGuid(),
Encoding.UTF8.GetBytes(jsonEnvelope),
+ new Dictionary<HeaderKey, HeaderValue>
+ {
+ { HeaderKey.New("message_type"),
HeaderValue.FromString(serializableMessage.MessageType) }
+ });
+ }
+ ).ToList();
+
+ var streamIdentifier = Identifier.Numeric(STREAM_ID);
+ var topicIdentifier = Identifier.Numeric(TOPIC_ID);
+ logger.LogInformation("Sending messages count: {Count}",
messagesPerBatch);
+
+ await client.SendMessagesAsync(
+ new MessageSendRequest
+ {
+ StreamId = streamIdentifier,
+ TopicId = topicIdentifier,
+ Partitioning = partitioning,
+ Messages = messages
+ }
+ );
+
+ sentBatches++;
+ logger.LogInformation("Sent messages: {Messages}.",
serializableMessages);
+
+ await Task.Delay(interval);
+ }
+ }
+
+ public static string GetTcpServerAddr(string[] args, ILogger logger)
+ {
+ var defaultServerAddr = "127.0.0.1:8090";
+ var argumentName = args.Length > 0 ? args[0] : null;
+ var tcpServerAddr = args.Length > 1 ? args[1] : null;
+
+ if (argumentName is null && tcpServerAddr is null) return
defaultServerAddr;
+
+ argumentName = argumentName ?? throw new
ArgumentNullException(argumentName);
+ if (argumentName != "--tcp-server-address")
+ throw new FormatException(
+ $"Invalid argument {argumentName}! Usage: --tcp-server-address
<server-address>"
+ );
+ tcpServerAddr = tcpServerAddr ?? throw new
ArgumentNullException(tcpServerAddr);
+ if (!IPEndPoint.TryParse(tcpServerAddr, out _))
+ throw new FormatException(
+ $"Invalid server address {tcpServerAddr}! Usage:
--tcp-server-address <server-address>"
+ );
+ logger.LogInformation("Using server address: {TcpServerAddr}",
tcpServerAddr);
+ return tcpServerAddr;
+ }
+}
diff --git a/foreign/csharp/Iggy_SDK.sln b/foreign/csharp/Iggy_SDK.sln
index b1b746e0..611c2f84 100644
--- a/foreign/csharp/Iggy_SDK.sln
+++ b/foreign/csharp/Iggy_SDK.sln
@@ -4,10 +4,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") =
"Iggy_SDK", "Iggy_SDK\Iggy_S
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Iggy_SDK_Tests",
"Iggy_SDK_Tests\Iggy_SDK_Tests.csproj", "{8514D555-35CF-4538-97D3-DC42A182B694}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Iggy_Sample_Producer",
"Iggy_Sample_Producer\Iggy_Sample_Producer.csproj",
"{ABF3900E-D9C1-46F8-97B1-E0918DCFC89C}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Iggy_Sample_Consumer",
"Iggy_Sample_Consumer\Iggy_Sample_Consumer.csproj",
"{2ED2B03B-369E-493C-9AB3-0FF7EA9AFDE2}"
-EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shared",
"Shared\Shared.csproj", "{A9D91A9B-606D-469E-BAD9-B220A4AB31CA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmarks",
"Benchmarks\Benchmarks.csproj", "{466C54AD-A5FA-401C-9300-8FE21D3C355A}"
@@ -40,14 +36,6 @@ Global
{8514D555-35CF-4538-97D3-DC42A182B694}.Debug|Any CPU.Build.0 =
Debug|Any CPU
{8514D555-35CF-4538-97D3-DC42A182B694}.Release|Any
CPU.ActiveCfg = Release|Any CPU
{8514D555-35CF-4538-97D3-DC42A182B694}.Release|Any CPU.Build.0
= Release|Any CPU
- {ABF3900E-D9C1-46F8-97B1-E0918DCFC89C}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
- {ABF3900E-D9C1-46F8-97B1-E0918DCFC89C}.Debug|Any CPU.Build.0 =
Debug|Any CPU
- {ABF3900E-D9C1-46F8-97B1-E0918DCFC89C}.Release|Any
CPU.ActiveCfg = Release|Any CPU
- {ABF3900E-D9C1-46F8-97B1-E0918DCFC89C}.Release|Any CPU.Build.0
= Release|Any CPU
- {2ED2B03B-369E-493C-9AB3-0FF7EA9AFDE2}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
- {2ED2B03B-369E-493C-9AB3-0FF7EA9AFDE2}.Debug|Any CPU.Build.0 =
Debug|Any CPU
- {2ED2B03B-369E-493C-9AB3-0FF7EA9AFDE2}.Release|Any
CPU.ActiveCfg = Release|Any CPU
- {2ED2B03B-369E-493C-9AB3-0FF7EA9AFDE2}.Release|Any CPU.Build.0
= Release|Any CPU
{A9D91A9B-606D-469E-BAD9-B220A4AB31CA}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
{A9D91A9B-606D-469E-BAD9-B220A4AB31CA}.Debug|Any CPU.Build.0 =
Debug|Any CPU
{A9D91A9B-606D-469E-BAD9-B220A4AB31CA}.Release|Any
CPU.ActiveCfg = Release|Any CPU
diff --git a/foreign/csharp/Iggy_Sample_Consumer/Program.cs
b/foreign/csharp/Iggy_Sample_Consumer/Program.cs
deleted file mode 100644
index 073f9eba..00000000
--- a/foreign/csharp/Iggy_Sample_Consumer/Program.cs
+++ /dev/null
@@ -1,227 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System.Security.Cryptography;
-using System.Text;
-using System.Text.Json;
-using Apache.Iggy;
-using Apache.Iggy.Contracts;
-using Apache.Iggy.Enums;
-using Apache.Iggy.Factory;
-using Apache.Iggy.Kinds;
-using Apache.Iggy.Shared;
-using Microsoft.Extensions.Logging;
-
-var jsonOptions = new JsonSerializerOptions();
-jsonOptions.PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower;
-jsonOptions.WriteIndented = true;
-var protocol = Protocol.Tcp;
-var loggerFactory = LoggerFactory.Create(builder =>
-{
- builder
- .AddFilter("Iggy_SDK.IggyClient.Implementations;", LogLevel.Trace)
- .AddConsole();
-});
-var bus = MessageStreamFactory.CreateMessageStream(options =>
-{
- options.BaseAdress = "127.0.0.1:8090";
- options.Protocol = protocol;
-
- options.MessageBatchingSettings = x =>
- {
- x.Enabled = false;
- x.Interval = TimeSpan.FromMilliseconds(100);
- x.MaxMessagesPerBatch = 1000;
- x.MaxRequests = 4096;
- };
- options.MessagePollingSettings = x =>
- {
- x.Interval = TimeSpan.FromMilliseconds(100);
- x.StoreOffsetStrategy = StoreOffset.AfterProcessingEachMessage;
- };
- options.TlsSettings = x =>
- {
- x.Enabled = false;
- x.Hostname = "iggy";
- x.Authenticate = false;
- };
-}, loggerFactory);
-
-var response = await bus.LoginUser("iggy", "iggy");
-
-Console.WriteLine("Using protocol : {0}", protocol.ToString());
-var streamIdVal = 1u;
-var topicIdVal = 1u;
-var streamId = Identifier.Numeric(streamIdVal);
-var topicId = Identifier.Numeric(topicIdVal);
-var partitionId = (uint)3;
-var consumerId = 1;
-
-
-Console.WriteLine($"Consumer has started, selected protocol {protocol}");
-
-await ValidateSystem(streamId, topicId, partitionId);
-await ConsumeMessages();
-
-async Task ConsumeMessages()
-{
- var intervalInMs = 1000;
- Console.WriteLine(
- $"Messages will be polled from stream {streamId}, topic {topicId},
partition {partitionId} with interval {intervalInMs} ms");
- Func<byte[], Envelope> deserializer = serializedData =>
- {
- var envelope = new Envelope();
- var messageTypeLength = BitConverter.ToInt32(serializedData, 0);
- envelope.MessageType = Encoding.UTF8.GetString(serializedData, 4,
messageTypeLength);
- envelope.Payload = Encoding.UTF8.GetString(serializedData, 4 +
messageTypeLength,
- serializedData.Length - (4 + messageTypeLength));
- return envelope;
- };
- Func<byte[], byte[]> decryptor = static payload =>
- {
- var aes_key = "AXe8YwuIn1zxt3FPWTZFlAa14EHdPAdN9FaZ9RQWihc=";
- var aes_iv = "bsxnWolsAyO7kCfWuyrnqg==";
- var key = Convert.FromBase64String(aes_key);
- var iv = Convert.FromBase64String(aes_iv);
-
- using var aes = Aes.Create();
- var decryptor = aes.CreateDecryptor(key, iv);
- using var memoryStream = new MemoryStream(payload);
- using var cryptoStream = new CryptoStream(memoryStream, decryptor,
CryptoStreamMode.Read);
- using var binaryReader = new BinaryReader(cryptoStream);
- return binaryReader.ReadBytes(payload.Length);
- };
-
- PolledMessages<Envelope> messages = await bus.PollMessagesAsync(new
MessageFetchRequest
- {
- StreamId = streamId,
- TopicId = topicId,
- Consumer = Consumer.New(1),
- Count = 1,
- PartitionId = 1,
- PollingStrategy = PollingStrategy.Next(),
- AutoCommit = true
- }, deserializer, decryptor);
- await foreach (MessageResponse<Envelope> msgResponse in
bus.PollMessagesAsync(
- new PollMessagesRequest
- {
- Consumer = Consumer.New(consumerId),
- Count = 1,
- TopicId = topicId,
- StreamId = streamId,
- PartitionId = partitionId,
- PollingStrategy = PollingStrategy.Next()
- }, deserializer, decryptor))
- {
- HandleMessage(msgResponse);
- }
-}
-
-void HandleMessage(MessageResponse<Envelope> messageResponse)
-{
- Console.Write(
- $"Handling message type: {messageResponse.Message.MessageType} with
checksum: {messageResponse.Header.Checksum}, at offset:
{messageResponse.Header.Offset} with message
Id:{messageResponse.Header.Id.ToString()} ");
- Console.WriteLine();
-
Console.WriteLine("---------------------------MESSAGE-----------------------------------");
- Console.WriteLine();
-
- switch (messageResponse.Message.MessageType)
- {
- case "order_created":
- {
- var orderCreated =
JsonSerializer.Deserialize<OrderCreated>(messageResponse.Message.Payload,
jsonOptions);
- Console.WriteLine(orderCreated);
- break;
- }
- case "order_confirmed":
- {
- var orderConfirmed =
-
JsonSerializer.Deserialize<OrderConfirmed>(messageResponse.Message.Payload,
jsonOptions);
- Console.WriteLine(orderConfirmed);
- break;
- }
- case "order_rejected":
- {
- var orderRejected =
JsonSerializer.Deserialize<OrderRejected>(messageResponse.Message.Payload,
jsonOptions);
- Console.WriteLine(orderRejected);
- break;
- }
- }
-
-
- if (messageResponse.UserHeaders is not null)
- {
- Console.WriteLine();
-
Console.WriteLine("---------------------------HEADERS-----------------------------------");
- Console.WriteLine();
- foreach (var (headerKey, headerValue) in messageResponse.UserHeaders)
- {
- Console.WriteLine("Found Header: {0} with value: {1}, ",
headerKey.ToString(), headerValue.ToString());
- }
-
- Console.WriteLine();
- }
- //await Task.Delay(1000);
-}
-
-
-async Task ValidateSystem(Identifier streamId, Identifier topicId, uint
partitionId)
-{
- try
- {
- Console.WriteLine($"Validating if stream exists.. {streamId}");
-
- var result = await bus.GetStreamByIdAsync(streamId);
-
- Console.WriteLine(result!.Name);
-
- Console.WriteLine($"Validating if topic exists.. {topicId}");
-
- var topicResult = await bus.GetTopicByIdAsync(streamId, topicId);
-
- if (topicResult!.PartitionsCount < partitionId)
- {
- throw new SystemException(
- $"Topic {topicId} has only {topicResult.PartitionsCount}
partitions, but partition {partitionId} was requested");
- }
- }
- catch
- {
- Console.WriteLine($"Creating stream with {streamId}");
-
- await bus.CreateStreamAsync("Test Consumer Stream", streamIdVal);
-
- Console.WriteLine($"Creating topic with {topicId}");
-
- await bus.CreateTopicAsync(streamId,
- topicId: topicIdVal,
- name: "Test Consumer Topic",
- compressionAlgorithm: CompressionAlgorithm.None,
- messageExpiry: 0,
- maxTopicSize: 1_000_000_000,
- replicationFactor: 3,
- partitionsCount: 3);
-
- var topicRes = await bus.GetTopicByIdAsync(streamId, topicId);
-
- if (topicRes!.PartitionsCount < partitionId)
- {
- throw new SystemException(
- $"Topic {topicId} has only {topicRes.PartitionsCount}
partitions, but partition {partitionId} was requested");
- }
- }
-}
\ No newline at end of file
diff --git a/foreign/csharp/Iggy_Sample_Producer/MessageGenerator.cs
b/foreign/csharp/Iggy_Sample_Producer/MessageGenerator.cs
deleted file mode 100644
index 4a45618c..00000000
--- a/foreign/csharp/Iggy_Sample_Producer/MessageGenerator.cs
+++ /dev/null
@@ -1,85 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using Apache.Iggy.Shared;
-
-namespace Apache.Iggy.Producer;
-
-public static class MessageGenerator
-{
- private static int OrderCreatedId;
- private static int OrderConfirmedId;
- private static int OrderRejectedId;
-
- public static ISerializableMessage GenerateMessage()
- {
- return Random.Shared.Next(1, 4) switch
- {
- 1 => GenerateOrderRejectedMessage(),
- 2 => GenerateOrderConfirmedMessage(),
- 3 => GenerateOrderCreatedMessage(),
- _ => GenerateOrderCreatedMessage()
- };
- }
-
- private static ISerializableMessage GenerateOrderCreatedMessage()
- {
- return new OrderCreated
- {
- Id = OrderCreatedId++,
- CurrencyPair = Random.Shared.Next(1, 3) switch
- {
- 1 => "BTC/USDT",
- 2 => "ETH/USDT",
- _ => "LTC/USDT"
- },
- Price = Random.Shared.Next(69, 420),
- Quantity = Random.Shared.Next(69, 420),
- Side = Random.Shared.Next(1, 2) switch
- {
- 1 => "Buy",
- _ => "Sell"
- },
- Timestamp = (ulong)Random.Shared.Next(420, 69420)
- };
- }
-
- private static ISerializableMessage GenerateOrderConfirmedMessage()
- {
- return new OrderConfirmed
- {
- Id = OrderConfirmedId++,
- Price = Random.Shared.Next(69, 420),
- Timestamp = (ulong)Random.Shared.Next(420, 69420)
- };
- }
-
- private static ISerializableMessage GenerateOrderRejectedMessage()
- {
- return new OrderRejected
- {
- Id = OrderRejectedId++,
- Timestamp = (ulong)Random.Shared.Next(421, 69420),
- Reason = Random.Shared.Next(1, 3) switch
- {
- 1 => "Cancelled by user",
- 2 => "Insufficient funds",
- _ => "Other"
- }
- };
- }
-}
\ No newline at end of file
diff --git a/foreign/csharp/Iggy_Sample_Producer/Program.cs
b/foreign/csharp/Iggy_Sample_Producer/Program.cs
deleted file mode 100644
index 13e5c4d7..00000000
--- a/foreign/csharp/Iggy_Sample_Producer/Program.cs
+++ /dev/null
@@ -1,224 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System.Buffers.Binary;
-using System.Security.Cryptography;
-using System.Text;
-using Apache.Iggy;
-using Apache.Iggy.Contracts;
-using Apache.Iggy.Enums;
-using Apache.Iggy.Factory;
-using Apache.Iggy.Headers;
-using Apache.Iggy.IggyClient;
-using Apache.Iggy.Messages;
-using Apache.Iggy.Producer;
-using Apache.Iggy.Shared;
-using Microsoft.Extensions.Logging;
-using Partitioning = Apache.Iggy.Kinds.Partitioning;
-
-var protocol = Protocol.Tcp;
-var loggerFactory = LoggerFactory.Create(builder =>
-{
- builder
- .AddFilter("Iggy_SDK.IggyClient.Implementations;", LogLevel.Trace)
- .AddConsole();
-});
-var bus = MessageStreamFactory.CreateMessageStream(options =>
-{
- options.BaseAdress = "127.0.0.1:8090";
- options.Protocol = protocol;
- options.MessageBatchingSettings = x =>
- {
- x.Enabled = false;
- x.Interval = TimeSpan.FromMilliseconds(101);
- x.MaxMessagesPerBatch = 1000;
- x.MaxRequests = 4096;
- };
- options.TlsSettings = x =>
- {
- x.Enabled = false;
- x.Hostname = "iggy";
- x.Authenticate = false;
- };
-}, loggerFactory);
-
-try
-{
- var response = await bus.LoginUser("iggy", "iggy");
-}
-catch
-{
- await bus.CreateUser("test_user", "pa55w0rD!@", UserStatus.Active);
-
- var response = await bus.LoginUser("iggy", "iggy");
-}
-
-Console.WriteLine("Using protocol : {0}", protocol.ToString());
-
-var streamIdVal = 1u;
-var topicIdVal = 1u;
-var streamId = Identifier.Numeric(streamIdVal);
-var topicId = Identifier.Numeric(topicIdVal);
-
-Console.WriteLine($"Producer has started, selected protocol
{protocol.ToString()}");
-
-try
-{
- var stream = await bus.GetStreamByIdAsync(streamId);
- var topic = await bus.GetTopicByIdAsync(streamId, topicId);
-}
-catch
-{
- Console.WriteLine($"Creating stream with id:{streamId}");
- await bus.CreateStreamAsync("producer-stream", streamIdVal);
-
- Console.WriteLine($"Creating topic with id:{topicId}");
- await bus.CreateTopicAsync(streamId,
- topicId: topicIdVal,
- name: "producer-topic",
- compressionAlgorithm: CompressionAlgorithm.None,
- messageExpiry: 0,
- maxTopicSize: 0,
- replicationFactor: 3,
- partitionsCount: 3);
-}
-
-var actualStream = await bus.GetStreamByIdAsync(streamId);
-var actualTopic = await bus.GetTopicByIdAsync(streamId, topicId);
-
-await ProduceMessages(bus, actualStream, actualTopic);
-
-async Task ProduceMessages(IIggyClient bus, StreamResponse? stream,
TopicResponse? topic)
-{
- var messageBatchCount = 1;
- var intervalInMs = 1000;
- Console.WriteLine(
- $"Messages will be sent to stream {stream!.Id}, topic {topic!.Id},
partition {topic.PartitionsCount} with interval {intervalInMs} ms");
- Func<Envelope, byte[]> serializer = static envelope =>
- {
- Span<byte> buffer = stackalloc byte[envelope.MessageType.Length + 4 +
envelope.Payload.Length];
- BinaryPrimitives.WriteInt32LittleEndian(buffer[..4],
envelope.MessageType.Length);
-
Encoding.UTF8.GetBytes(envelope.MessageType).CopyTo(buffer[4..(envelope.MessageType.Length
+ 4)]);
-
Encoding.UTF8.GetBytes(envelope.Payload).CopyTo(buffer[(envelope.MessageType.Length
+ 4)..]);
- return buffer.ToArray();
- };
- //can this be optimized ? this lambda doesn't seem to get cached
- Func<byte[], byte[]> encryptor = static payload =>
- {
- var aes_key = "AXe8YwuIn1zxt3FPWTZFlAa14EHdPAdN9FaZ9RQWihc=";
- var aes_iv = "bsxnWolsAyO7kCfWuyrnqg==";
- var key = Convert.FromBase64String(aes_key);
- var iv = Convert.FromBase64String(aes_iv);
-
- using var aes = Aes.Create();
- var encryptor = aes.CreateEncryptor(key, iv);
- using var memoryStream = new MemoryStream();
- using var cryptoStream = new CryptoStream(memoryStream, encryptor,
CryptoStreamMode.Write);
- using (var streamWriter = new BinaryWriter(cryptoStream))
- {
- streamWriter.Write(payload);
- }
-
- return memoryStream.ToArray();
- };
-
- var byteArray = new byte[] { 6, 9, 4, 2, 0 };
-
-
- var headers = new Dictionary<HeaderKey, HeaderValue>();
- headers.Add(new HeaderKey { Value = "key_1".ToLower() },
HeaderValue.FromString("test-value-1"));
- headers.Add(new HeaderKey { Value = "key_2".ToLower() },
HeaderValue.FromInt32(69));
- headers.Add(new HeaderKey { Value = "key_3".ToLower() },
HeaderValue.FromFloat(420.69f));
- headers.Add(new HeaderKey { Value = "key_4".ToLower() },
HeaderValue.FromBool(true));
- headers.Add(new HeaderKey { Value = "key_5".ToLower() },
HeaderValue.FromBytes(byteArray));
- headers.Add(new HeaderKey { Value = "key_6".ToLower() },
HeaderValue.FromInt128(new Int128(6969696969, 420420420)));
- headers.Add(new HeaderKey { Value = "key_7".ToLower() },
HeaderValue.FromGuid(Guid.NewGuid()));
-
- while (true)
- {
- var debugMessages = new List<ISerializableMessage>();
- var messages = new Envelope[messageBatchCount];
-
- for (var i = 0; i < messageBatchCount; i++)
- {
- var message = MessageGenerator.GenerateMessage();
- var envelope = message.ToEnvelope();
-
- debugMessages.Add(message);
- messages[i] = envelope;
- }
-
- var messagesSerialized = new List<Message>();
- foreach (var message in messages)
- {
- messagesSerialized.Add(new Message(Guid.NewGuid(),
encryptor(serializer(message)), headers));
- }
-
- try
- {
- await bus.SendMessagesAsync(new MessageSendRequest<Envelope>
- {
- StreamId = streamId,
- TopicId = topicId,
- Partitioning = Partitioning.PartitionId(3),
- Messages = messages
- },
- serializer,
- encryptor, headers);
- }
- catch (Exception e)
- {
- Console.WriteLine(e.Message);
- throw;
- }
-
- Console.WriteLine(
- $"Sent messages: {string.Join(Environment.NewLine,
debugMessages.ConvertAll(m => m.ToString()))}");
- await Task.Delay(intervalInMs);
- }
-}
-
-
-namespace Apache.Iggy.Producer
-{
- public static class EncryptorData
- {
- private static readonly byte[] key =
- {
- 0x2b, 0x7e, 0x15, 0x16, 0x28, 0xae, 0xd2, 0xa6, 0xab, 0xf7,
- 0x15, 0x88, 0x09, 0xcf, 0x4f, 0x3c, 0xa8, 0x8d, 0x2d, 0x0a,
- 0x9f, 0x9d, 0xea, 0x43, 0x6c, 0x25, 0x17, 0x13, 0x20, 0x45,
- 0x78, 0xc8
- };
-
- private static readonly byte[] iv =
- {
- 0x5f, 0x8a, 0xe4, 0x78, 0x9c, 0x3d, 0x2b, 0x0f, 0x12, 0x6a,
- 0x7e, 0x45, 0x91, 0xba, 0xdf, 0x33
- };
-
- public static byte[] GetKey()
- {
- return key;
- }
-
- public static byte[] GetIv()
- {
- return iv;
- }
- }
-}
\ No newline at end of file
diff --git a/scripts/run-csharp-examples-from-readme.sh
b/scripts/run-csharp-examples-from-readme.sh
new file mode 100755
index 00000000..5f07840b
--- /dev/null
+++ b/scripts/run-csharp-examples-from-readme.sh
@@ -0,0 +1,210 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+
+# Script to run Csharp examples from examples/csharp/README.md files
+# Usage: ./scripts/run-csharp-examples-from-readme.sh [OPTIONS]
+#
+# --csos - Optional target OS (e.g., linux, darwin)
+# --csarch - Optional target architecture (e.g., amd64, arm64)
+# --target - Optional target architecture for rust (e.g.,
x86_64-unknown-linux-musl)
+#
+# This script will run all the commands from examples/csharp/README.md files
+# and check if they pass or fail.
+# If any command fails, it will print the command and exit with non-zero
status.
+# If all commands pass, it will remove the log file and exit with zero status.
+#
+# Note: This script assumes that the iggy-server is not running and will start
it in the background.
+# It will wait until the server is started before running the commands.
+# It will also terminate the server after running all the commands.
+# Script executes every command in README files which is enclosed in
backticks (`) and starts
+# with `dotnet run --project src/xxx`. Other commands are ignored.
+# Order of commands in README files is important as script will execute
them from top to bottom.
+#
+
+readonly LOG_FILE="iggy-server.log"
+readonly PID_FILE="iggy-server.pid"
+readonly TIMEOUT=300
+
+# Get target architecture from argument or use default
+
+CSOS="" # chsarp target OS
+CSARCH="" # csharp target architecture
+TARGET="" # Iggy server target architecture
+
+while [[ $# -gt 0 ]]; do
+ case "$1" in
+ --csharpos)
+ CSOS="$2"
+ shift 2
+ ;;
+ --csharparch)
+ CSARCH="$2"
+ shift 2
+ ;;
+ --target)
+ TARGET="$2"
+ shift 2
+ ;;
+ *)
+ echo "Unknown option: $1"
+ echo "Usage: $0 [--csos CSOS] [--csarch CSARCH] [--target TARGET]"
+ exit 1
+ ;;
+ esac
+done
+
+# Remove old server data if present
+test -d local_data && rm -fr local_data
+test -e ${LOG_FILE} && rm ${LOG_FILE}
+test -e ${PID_FILE} && rm ${PID_FILE}
+
+# Check if server binary exists
+SERVER_BIN=""
+if [ -n "${TARGET}" ]; then
+ SERVER_BIN="target/${TARGET}/debug/iggy-server"
+else
+ SERVER_BIN="target/debug/iggy-server"
+fi
+
+if [ ! -f "${SERVER_BIN}" ]; then
+ echo "Error: Server binary not found at ${SERVER_BIN}"
+ echo "Please build the server binary before running this script:"
+ if [ -n "${TARGET}" ]; then
+ echo " cargo build --target ${TARGET} --bin iggy-server"
+ else
+ echo " cargo build --bin iggy-server"
+ fi
+ exit 1
+fi
+
+echo "Using server binary at ${SERVER_BIN}"
+
+# Run iggy server using the prebuilt binary
+echo "Starting server from ${SERVER_BIN}..."
+IGGY_ROOT_USERNAME=iggy IGGY_ROOT_PASSWORD=iggy ${SERVER_BIN} &>${LOG_FILE} &
+echo $! >${PID_FILE}
+
+# Wait until "Iggy server has started" string is present inside iggy-server.log
+SERVER_START_TIME=0
+while ! grep -q "Iggy server has started" ${LOG_FILE}; do
+ if [ ${SERVER_START_TIME} -gt ${TIMEOUT} ]; then
+ echo "Server did not start within ${TIMEOUT} seconds."
+ ps fx
+ cat ${LOG_FILE}
+ exit 1
+ fi
+ echo "Waiting for Iggy server to start... ${SERVER_START_TIME}"
+ sleep 1
+ ((SERVER_START_TIME += 1))
+done
+
+# Execute all matching CLI commands from README.md and check if they pass or
fail
+while IFS= read -r command; do
+ # Remove backticks from command
+ command=$(echo "${command}" | tr -d '`')
+
+ # Add target flag if specified
+ if [ -n "${CSOS}" ]; then
+ command="${command//dotnet run /dotnet run --os ${CSOS} }"
+ fi
+
+ if [ -n "${CSARCH}" ]; then
+ command="${command//dotnet run /dotnet run --arch ${CSARCH} }"
+ fi
+
+ echo -e "\e[33mChecking CLI command:\e[0m ${command}"
+ echo ""
+
+ set +e
+ eval "${command}"
+ exit_code=$?
+ set -e
+
+ # Stop at first failure
+ if [ ${exit_code} -ne 0 ]; then
+ echo ""
+ echo -e "\e[31mCLI command failed:\e[0m ${command}"
+ echo ""
+ break
+ fi
+
+done < <(grep -E "^\`cargo r --bin iggy -- " README.md)
+
+# Execute all example commands from README.md and examples/rust/README.md and
check if they pass or fail
+for readme_file in README.md examples/csharp/README.md; do
+ if [ ! -f "${readme_file}" ]; then
+ continue
+ fi
+
+ while IFS= read -r command; do
+ # Remove backticks and comments from command
+ command=$(echo "${command}" | tr -d '`' | sed 's/^#.*//')
+ # Skip empty lines
+ if [ -z "${command}" ]; then
+ continue
+ fi
+
+ # Add target flag if specified
+ if [ -n "${CSOS}" ]; then
+ command="${command//dotnet run /dotnet run --os ${CSOS} }"
+ fi
+
+ if [ -n "${CSARCH}" ]; then
+ command="${command//dotnet run /dotnet run --arch ${CSARCH} }"
+ fi
+
+ echo -e "\e[33mChecking example command from ${readme_file}:\e[0m
${command}"
+ echo ""
+
+ set +e
+ eval "${command}"
+ exit_code=$?
+ set -e
+
+ # Stop at first failure
+ if [ ${exit_code} -ne 0 ]; then
+ echo ""
+ echo -e "\e[31mExample command failed:\e[0m ${command}"
+ echo ""
+ break 2 # Break from both loops
+ fi
+ # Add a small delay between examples to avoid potential race conditions
+ sleep 2
+
+ done < <(grep -E "^dotnet run --project" "${readme_file}")
+done
+
+# Terminate server
+kill -TERM "$(cat ${PID_FILE})"
+test -e ${PID_FILE} && rm ${PID_FILE}
+
+# If everything is ok remove log and pid files otherwise cat server log
+if [ "${exit_code}" -eq 0 ]; then
+ echo "Test passed"
+else
+ echo "Test failed, see log file:"
+ test -e ${LOG_FILE} && cat ${LOG_FILE}
+fi
+
+test -e ${LOG_FILE} && rm ${LOG_FILE}
+test -e ${PID_FILE} && rm ${PID_FILE}
+
+exit "${exit_code}"