This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 15d2f1572 ci: add pre-commit hooks for all SDKs (#2676)
15d2f1572 is described below
commit 15d2f15724fefbfee1f7701bf0ae04aef24f4707
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Feb 4 11:05:03 2026 +0100
ci: add pre-commit hooks for all SDKs (#2676)
Developers can now validate code locally before pushing,
reducing CI feedback cycles. Hooks cover Go, Node.js, Java,
C#, Python, and web UI with the same tools and versions as
CI (e.g., golangci-lint v1.64.8).
Also aligns CI coverage with pre-commit: Go fmt/tidy, Python
ruff, and Node eslint now check bdd/ and examples/ dirs.
Web UI lint is now blocking since all issues are resolved.
---
.github/actions/go/pre-merge/action.yml | 30 ++++-
.github/actions/node-npm/pre-merge/action.yml | 8 ++
.../actions/python-maturin/pre-merge/action.yml | 24 ++--
.github/workflows/_test.yml | 3 +-
.pre-commit-config.yaml | 103 +++++++++++++--
bdd/go/go.sum | 4 -
bdd/go/tests/basic_messaging_test.go | 32 ++---
bdd/go/tests/tcp_test/consumers_steps.go | 8 +-
bdd/go/tests/tcp_test/stream_feature_create.go | 20 +--
bdd/go/tests/tcp_test/stream_feature_get_by_id.go | 8 +-
bdd/go/tests/tcp_test/stream_steps.go | 2 +-
bdd/go/tests/tcp_test/topic_steps.go | 8 +-
bdd/python/tests/conftest.py | 5 +-
bdd/python/tests/test_basic_messaging.py | 96 +++++++++-----
examples/go/getting-started/consumer/main.go | 8 +-
examples/go/getting-started/producer/main.go | 14 +-
examples/node/src/basic/producer.ts | 1 -
examples/node/src/getting-started/producer.ts | 1 -
.../go/benchmarks/send_messages_benchmark_test.go | 12 +-
.../binary_request_serializer.go | 146 ++++++++++-----------
.../create_stream_serializer.go | 16 +--
.../create_stream_serializer_test.go | 2 +-
.../create_topic_serializer.go | 2 +-
.../fetch_messages_request_serializer.go | 30 ++---
foreign/go/contracts/consumer.go | 2 +-
foreign/go/contracts/consumer_groups.go | 6 +-
foreign/go/iggycli/client.go | 34 ++---
foreign/go/samples/consumer/consumer.go | 20 +--
foreign/go/samples/producer/producer.go | 20 +--
foreign/go/tcp/tcp_consumer_group_managament.go | 6 +-
foreign/go/tcp/tcp_stream_managament.go | 2 +-
foreign/go/tcp/tcp_topic_managament.go | 2 +-
.../lib/components/Layouts/SettingsLayout.svelte | 20 +--
.../lib/components/Modals/InspectMessage.svelte | 6 +-
web/src/lib/components/Navbar.svelte | 24 +---
web/src/routes/auth/sign-in/+page.svelte | 2 +-
.../[partitionId=i32]/messages/+page.svelte | 6 +-
37 files changed, 426 insertions(+), 307 deletions(-)
diff --git a/.github/actions/go/pre-merge/action.yml
b/.github/actions/go/pre-merge/action.yml
index bcad456a0..44fb79b50 100644
--- a/.github/actions/go/pre-merge/action.yml
+++ b/.github/actions/go/pre-merge/action.yml
@@ -31,13 +31,31 @@ runs:
with:
enabled: "false"
+ - name: Format Check
+ shell: bash
+ if: inputs.task == 'lint'
+ run: |
+ for dir in foreign/go bdd/go examples/go; do
+ if [ -d "$dir" ]; then
+ echo "Checking formatting in $dir..."
+ if [ -n "$(gofmt -l "$dir")" ]; then
+ echo "::error::Go files in $dir are not formatted. Run 'gofmt -w
-s $dir'"
+ gofmt -d "$dir"
+ exit 1
+ fi
+ fi
+ done
+
- name: Tidy Check
shell: bash
if: inputs.task == 'test' || inputs.task == 'lint'
run: |
- cd foreign/go
- go mod tidy
- git diff --exit-code go.mod go.sum
+ for dir in foreign/go bdd/go examples/go; do
+ if [ -f "$dir/go.mod" ]; then
+ echo "Checking go mod tidy in $dir..."
+ (cd "$dir" && go mod tidy && git diff --exit-code go.mod go.sum)
+ fi
+ done
- name: Check Generated Code
shell: bash
@@ -69,7 +87,7 @@ runs:
if: inputs.task == 'lint'
uses: golangci/golangci-lint-action@v6
with:
- version: v1.61
+ version: v1.64.8
working-directory: foreign/go
args: --timeout=5m
@@ -77,7 +95,7 @@ runs:
if: inputs.task == 'lint' && hashFiles('bdd/go/go.mod') != ''
uses: golangci/golangci-lint-action@v6
with:
- version: v1.61
+ version: v1.64.8
working-directory: bdd/go
args: --timeout=5m
@@ -85,7 +103,7 @@ runs:
if: inputs.task == 'lint' && hashFiles('examples/go/go.mod') != ''
uses: golangci/golangci-lint-action@v6
with:
- version: v1.61
+ version: v1.64.8
working-directory: examples/go
args: --timeout=5m
diff --git a/.github/actions/node-npm/pre-merge/action.yml
b/.github/actions/node-npm/pre-merge/action.yml
index 8a6672441..1914c1b24 100644
--- a/.github/actions/node-npm/pre-merge/action.yml
+++ b/.github/actions/node-npm/pre-merge/action.yml
@@ -57,6 +57,14 @@ runs:
run: |
cd foreign/node
npm run lint
+
+ # Also lint examples/node if it exists
+ if [ -f "${GITHUB_WORKSPACE}/examples/node/package.json" ]; then
+ echo "Linting examples/node..."
+ cd "${GITHUB_WORKSPACE}/examples/node"
+ npm ci --ignore-scripts
+ npm run lint
+ fi
shell: bash
- name: Build
diff --git a/.github/actions/python-maturin/pre-merge/action.yml
b/.github/actions/python-maturin/pre-merge/action.yml
index dcd1fe89b..bc8020d98 100644
--- a/.github/actions/python-maturin/pre-merge/action.yml
+++ b/.github/actions/python-maturin/pre-merge/action.yml
@@ -64,23 +64,27 @@ runs:
- name: Lint and format check
if: inputs.task == 'lint'
run: |
- # Build list of directories to check using absolute paths
+ # Build list of directories to check
DIR_SDK="${GITHUB_WORKSPACE}/foreign/python"
- DIR_TEST="${DIR_SDK}/tests"
- DIRS_TO_CHECK="${DIR_TEST} ${DIR_SDK}"
- STUB_FILE="${DIR_SDK}/apache_iggy.pyi"
+ DIR_BDD="${GITHUB_WORKSPACE}/bdd/python"
+ DIR_EXAMPLES="${GITHUB_WORKSPACE}/examples/python"
- echo "Directories to check: $DIRS_TO_CHECK"
- echo "Stub file: $STUB_FILE"
+ # Collect all Python directories that exist
+ DIRS_TO_CHECK="$DIR_SDK"
+ [ -d "$DIR_BDD" ] && DIRS_TO_CHECK="$DIRS_TO_CHECK $DIR_BDD"
+ [ -d "$DIR_EXAMPLES" ] && DIRS_TO_CHECK="$DIRS_TO_CHECK $DIR_EXAMPLES"
- echo "ruff check --select I $DIRS_TO_CHECK"
- ruff check --select I $DIRS_TO_CHECK
+ echo "Directories to check: $DIRS_TO_CHECK"
echo "ruff version: $(ruff --version)"
- echo "ruff format --check $DIRS_TO_CHECK"
+ echo "Running ruff check..."
+ ruff check $DIRS_TO_CHECK
+
+ echo "Running ruff format --check..."
ruff format --check $DIRS_TO_CHECK
- echo "mypy --explicit-package-bases $DIR_SDK"
+ # mypy only for the SDK (has type stubs)
+ echo "Running mypy on SDK..."
mypy --explicit-package-bases "$DIR_SDK"
echo "mypy version: $(mypy --version)"
shell: bash
diff --git a/.github/workflows/_test.yml b/.github/workflows/_test.yml
index b1e3ac67e..c6a95ea71 100644
--- a/.github/workflows/_test.yml
+++ b/.github/workflows/_test.yml
@@ -103,8 +103,7 @@ jobs:
cd web
npm ci
if [ "${{ inputs.task }}" = "lint" ]; then
- # TODO(hubcio): make this blocking once Web UI lints are fixed
- npm run lint || true
+ npm run lint
elif [ "${{ inputs.task }}" = "build" ]; then
npm run build
fi
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 9aa44c45b..5fb429155 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -27,15 +27,15 @@ repos:
exclude: ^helm/charts/.*/templates/
- id: mixed-line-ending
- # Python SDK formatting
+ # Python formatting
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.15.0
hooks:
- id: ruff-check
args: [--fix]
- files: ^foreign/python/.*\.(py|pyi)$
+ files: ^(foreign|bdd|examples)/python/.*\.(py|pyi)$
- id: ruff-format
- files: ^foreign/python/.*\.(py|pyi)$
+ files: ^(foreign|bdd|examples)/python/.*\.(py|pyi)$
# CI scripts
- repo: local
@@ -146,9 +146,94 @@ repos:
pass_filenames: false
stages: [pre-push]
- # TODO(hubcio): add fast checks, linters, formatters for other
languages
- # - python (maturin, pytest, ruff, black)
- # - java
- # - go
- # - csharp
- # - js
+ # Go SDK formatting
+ - repo: local
+ hooks:
+ - id: go-fmt
+ name: go fmt
+ entry: bash -c 'for dir in foreign/go bdd/go examples/go; do [ -d
"$dir" ] && (cd "$dir" && gofmt -w -s .); done'
+ language: system
+ files: ^(foreign|bdd|examples)/go/.*\.go$
+ pass_filenames: false
+
+ - id: go-mod-tidy
+ name: go mod tidy
+ entry: bash -c 'for dir in foreign/go bdd/go examples/go; do [ -f
"$dir/go.mod" ] && (cd "$dir" && go mod tidy && git diff --exit-code go.mod
go.sum); done'
+ language: system
+ files: ^(foreign|bdd|examples)/go/(.*\.go|go\.mod|go\.sum)$
+ pass_filenames: false
+
+ # Go SDK linting (version must match CI:
.github/actions/go/pre-merge/action.yml)
+ - repo: local
+ hooks:
+ - id: golangci-lint-foreign
+ name: golangci-lint (foreign/go)
+ entry: bash -c 'cd foreign/go && golangci-lint run --timeout=5m ./...'
+ language: golang
+ additional_dependencies:
+ ["github.com/golangci/golangci-lint/cmd/[email protected]"]
+ files: ^foreign/go/.*\.go$
+ pass_filenames: false
+ stages: [pre-push]
+
+ - id: golangci-lint-bdd
+ name: golangci-lint (bdd/go)
+ entry: bash -c 'cd bdd/go && golangci-lint run --timeout=5m ./...'
+ language: golang
+ additional_dependencies:
+ ["github.com/golangci/golangci-lint/cmd/[email protected]"]
+ files: ^bdd/go/.*\.go$
+ pass_filenames: false
+ stages: [pre-push]
+
+ - id: golangci-lint-examples
+ name: golangci-lint (examples/go)
+ entry: bash -c 'cd examples/go && golangci-lint run --timeout=5m ./...'
+ language: golang
+ additional_dependencies:
+ ["github.com/golangci/golangci-lint/cmd/[email protected]"]
+ files: ^examples/go/.*\.go$
+ pass_filenames: false
+ stages: [pre-push]
+
+ # Node.js linting
+ - repo: local
+ hooks:
+ - id: node-eslint
+ name: node eslint
+ entry: bash -c 'for dir in foreign/node examples/node; do [ -f
"$dir/package.json" ] && (cd "$dir" && npm run lint); done'
+ language: system
+ files: ^(foreign|examples)/node/.*\.ts$
+ pass_filenames: false
+
+ # Web UI linting (Svelte + TypeScript)
+ - repo: local
+ hooks:
+ - id: web-lint
+ name: web lint
+ entry: bash -c 'cd web && npm run lint'
+ language: system
+ files: ^web/.*\.(ts|js|svelte)$
+ pass_filenames: false
+
+ # Java SDK linting (Checkstyle + Spotless via Gradle)
+ - repo: local
+ hooks:
+ - id: java-gradle-check
+ name: java gradle check
+ entry: bash -c 'for dir in foreign/java bdd/java examples/java; do [
-f "$dir/build.gradle.kts" ] && (cd "$dir" && ./gradlew check -x test); done'
+ language: system
+ files: ^(foreign|bdd|examples)/java/.*\.(java|kt|kts)$
+ pass_filenames: false
+ stages: [pre-push]
+
+ # C# SDK formatting
+ - repo: local
+ hooks:
+ - id: csharp-dotnet-format
+ name: csharp dotnet format
+ entry: bash -c 'dotnet format foreign/csharp/Iggy_SDK.sln
--verify-no-changes && dotnet format examples/csharp/Iggy_SDK.Examples.sln
--verify-no-changes'
+ language: system
+ files: ^(foreign|examples)/csharp/.*\.cs$
+ pass_filenames: false
+ stages: [pre-push]
diff --git a/bdd/go/go.sum b/bdd/go/go.sum
index 86932a34e..ca176e765 100644
--- a/bdd/go/go.sum
+++ b/bdd/go/go.sum
@@ -1,7 +1,6 @@
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod
h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cucumber/gherkin/go/v26 v26.2.0
h1:EgIjePLWiPeslwIWmNQ3XHcypPsWAHoMCz/YEBKP4GI=
github.com/cucumber/gherkin/go/v26 v26.2.0/go.mod
h1:t2GAPnB8maCT4lkHL99BDCVNzCh1d7dBhCLt150Nr/0=
-github.com/cucumber/godog v0.15.0/go.mod
h1:FX3rzIDybWABU4kuIXLZ/qtqEe1Ac5RdXmqvACJOces=
github.com/cucumber/godog v0.15.1
h1:rb/6oHDdvVZKS66hrhpjFQFHjthFSrQBCOI1LwshNTI=
github.com/cucumber/godog v0.15.1/go.mod
h1:qju+SQDewOljHuq9NSM66s0xEhogx0q30flfxL4WUk8=
github.com/cucumber/messages/go/v21 v21.0.1
h1:wzA0LxwjlWQYZd32VTlAVDTkW6inOFmSM+RuOwHZiMI=
@@ -15,7 +14,6 @@ github.com/go-logr/logr v1.4.2/go.mod
h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ4
github.com/go-task/slim-sprig/v3 v3.0.0
h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod
h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/gofrs/uuid v4.2.0+incompatible/go.mod
h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
-github.com/gofrs/uuid v4.3.1+incompatible
h1:0/KbAdpx3UXAx1kEOWHJeOkpbgRFGHVgv+CFIY7dBJI=
github.com/gofrs/uuid v4.3.1+incompatible/go.mod
h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid v4.4.0+incompatible
h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
github.com/gofrs/uuid v4.4.0+incompatible/go.mod
h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
@@ -28,7 +26,6 @@ github.com/google/uuid v1.6.0/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/hashicorp/go-immutable-radix v1.3.0/go.mod
h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-immutable-radix v1.3.1
h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc=
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod
h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
-github.com/hashicorp/go-memdb v1.3.4
h1:XSL3NR682X/cVk2IeV0d70N4DZ9ljI885xAEU8IoK3c=
github.com/hashicorp/go-memdb v1.3.4/go.mod
h1:uBTr1oQbtuMgd1SSGoR8YV27eT3sBHbYiNm53bMpgSg=
github.com/hashicorp/go-memdb v1.3.5
h1:b3taDMxCBCBVgyRrS1AZVHO14ubMYZB++QpNhBg+Nyo=
github.com/hashicorp/go-memdb v1.3.5/go.mod
h1:8IVKKBkVe+fxFgdFOYxzQQNjz+sWCyHCdIC/+5+Vy1Y=
@@ -36,7 +33,6 @@ github.com/hashicorp/go-uuid v1.0.0/go.mod
h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/go-uuid v1.0.2
h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
github.com/hashicorp/go-uuid v1.0.2/go.mod
h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0/go.mod
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
-github.com/hashicorp/golang-lru v0.5.4
h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod
h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru v1.0.2
h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
github.com/hashicorp/golang-lru v1.0.2/go.mod
h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
diff --git a/bdd/go/tests/basic_messaging_test.go
b/bdd/go/tests/basic_messaging_test.go
index 219886f28..d05d07d34 100644
--- a/bdd/go/tests/basic_messaging_test.go
+++ b/bdd/go/tests/basic_messaging_test.go
@@ -223,7 +223,7 @@ func givenNoStreams(ctx context.Context) error {
func whenCreateStream(ctx context.Context, streamName string) error {
c := getBasicMessagingCtx(ctx)
- stream, err := c.client.CreateStream(streamName)
+ stream, err := c.client.CreateStream(streamName)
if err != nil {
return fmt.Errorf("failed to create stream: %w", err)
}
@@ -240,8 +240,8 @@ func thenStreamCreatedSuccessfully(ctx context.Context)
error {
}
func thenStreamHasName(
- ctx context.Context,
- expectedName string) error {
+ ctx context.Context,
+ expectedName string) error {
c := getBasicMessagingCtx(ctx)
streamName := *c.lastStreamName
if streamName != expectedName {
@@ -251,20 +251,20 @@ func thenStreamHasName(
}
func whenCreateTopic(
- ctx context.Context,
- topicName string,
- streamID uint32,
- partitionsCount uint32) error {
+ ctx context.Context,
+ topicName string,
+ streamID uint32,
+ partitionsCount uint32) error {
c := getBasicMessagingCtx(ctx)
streamIdentifier, _ := iggcon.NewIdentifier(streamID)
- topic, err := c.client.CreateTopic(
+ topic, err := c.client.CreateTopic(
streamIdentifier,
topicName,
partitionsCount,
iggcon.CompressionAlgorithmNone,
iggcon.IggyExpiryNeverExpire,
- 0,
- nil,
+ 0,
+ nil,
)
if err != nil {
return fmt.Errorf("failed to create topic: %w", err)
@@ -284,8 +284,8 @@ func thenTopicCreatedSuccessfully(ctx context.Context)
error {
return nil
}
func thenTopicHasName(
- ctx context.Context,
- expectedName string) error {
+ ctx context.Context,
+ expectedName string) error {
c := getBasicMessagingCtx(ctx)
topicName := *c.lastTopicName
if topicName != expectedName {
@@ -315,13 +315,13 @@ func initScenarios(sc *godog.ScenarioContext) {
sc.Step(`^the messages should have sequential offsets from (\d+) to
(\d+)$`, thenMessagesHaveSequentialOffsets)
sc.Step(`each message should have the expected payload content`,
thenMessagesHaveExpectedPayload)
sc.Step(`the last polled message should match the last sent message`,
thenLastPolledMessageMatchesSent)
- sc.Step(`^the stream should have name "([^"]*)"$`, thenStreamHasName)
+ sc.Step(`^the stream should have name "([^"]*)"$`, thenStreamHasName)
sc.Step(`the stream should be created successfully`,
thenStreamCreatedSuccessfully)
- sc.Step(`^I create a stream with name "([^"]*)"$`, whenCreateStream)
+ sc.Step(`^I create a stream with name "([^"]*)"$`, whenCreateStream)
sc.Step(`I have no streams in the system`, givenNoStreams)
- sc.Step(`^I create a topic with name "([^"]*)" in stream (\d+) with (\d+)
partitions$`, whenCreateTopic)
+ sc.Step(`^I create a topic with name "([^"]*)" in stream (\d+) with
(\d+) partitions$`, whenCreateTopic)
sc.Step(`the topic should be created successfully`,
thenTopicCreatedSuccessfully)
- sc.Step(`^the topic should have name "([^"]*)"$`, thenTopicHasName)
+ sc.Step(`^the topic should have name "([^"]*)"$`, thenTopicHasName)
sc.Step(`^the topic should have (\d+) partitions$`,
thenTopicsHasPartitions)
}
diff --git a/bdd/go/tests/tcp_test/consumers_steps.go
b/bdd/go/tests/tcp_test/consumers_steps.go
index 890175004..367aa4563 100644
--- a/bdd/go/tests/tcp_test/consumers_steps.go
+++ b/bdd/go/tests/tcp_test/consumers_steps.go
@@ -32,12 +32,12 @@ func successfullyCreateConsumer(streamId uint32, topicId
uint32, cli iggycli.Cli
name := createRandomString(16)
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
topicIdentifier, _ := iggcon.NewIdentifier(topicId)
- group, err := cli.CreateConsumerGroup(
+ group, err := cli.CreateConsumerGroup(
streamIdentifier,
topicIdentifier,
- name)
- groupId := group.Id
- itShouldSuccessfullyCreateConsumer(streamId, topicId, groupId, name, cli)
+ name)
+ groupId := group.Id
+ itShouldSuccessfullyCreateConsumer(streamId, topicId, groupId, name,
cli)
itShouldNotReturnError(err)
return groupId, name
}
diff --git a/bdd/go/tests/tcp_test/stream_feature_create.go
b/bdd/go/tests/tcp_test/stream_feature_create.go
index 18d8243ed..92fc5e25e 100644
--- a/bdd/go/tests/tcp_test/stream_feature_create.go
+++ b/bdd/go/tests/tcp_test/stream_feature_create.go
@@ -28,7 +28,7 @@ var _ = ginkgo.Describe("CREATE STREAM:", func() {
client := createAuthorizedConnection()
name := createRandomString(32)
- stream, err := client.CreateStream(name)
+ stream, err := client.CreateStream(name)
defer deleteStreamAfterTests(stream.Id, client)
itShouldNotReturnError(err)
@@ -39,31 +39,31 @@ var _ = ginkgo.Describe("CREATE STREAM:", func() {
client := createAuthorizedConnection()
name := createRandomString(32)
- stream, err := client.CreateStream(name)
+ stream, err := client.CreateStream(name)
defer deleteStreamAfterTests(stream.Id, client)
itShouldNotReturnError(err)
itShouldSuccessfullyCreateStream(stream.Id, name,
client)
- _, err = client.CreateStream(name)
+ _, err = client.CreateStream(name)
itShouldReturnSpecificError(err,
ierror.ErrStreamNameAlreadyExists)
})
ginkgo.Context("and tries to create stream name that's over 255
characters", func() {
- client := createAuthorizedConnection()
- name := createRandomString(256)
+ client := createAuthorizedConnection()
+ name := createRandomString(256)
- _, err := client.CreateStream(name)
+ _, err := client.CreateStream(name)
- itShouldReturnSpecificError(err, ierror.ErrInvalidStreamName)
- })
+ itShouldReturnSpecificError(err,
ierror.ErrInvalidStreamName)
+ })
})
ginkgo.When("User is not logged in", func() {
ginkgo.Context("and tries to create stream", func() {
- client := createClient()
- _, err := client.CreateStream(createRandomString(32))
+ client := createClient()
+ _, err := client.CreateStream(createRandomString(32))
itShouldReturnUnauthenticatedError(err)
})
diff --git a/bdd/go/tests/tcp_test/stream_feature_get_by_id.go
b/bdd/go/tests/tcp_test/stream_feature_get_by_id.go
index d2bf125af..60e082771 100644
--- a/bdd/go/tests/tcp_test/stream_feature_get_by_id.go
+++ b/bdd/go/tests/tcp_test/stream_feature_get_by_id.go
@@ -57,22 +57,22 @@ var _ = ginkgo.Describe("GET STREAM BY ID:", func() {
// create two topics
t1Name := createRandomString(32)
t2Name := createRandomString(32)
- t1, err := client.CreateTopic(streamIdentifier,
+ t1, err := client.CreateTopic(streamIdentifier,
t1Name,
2,
iggcon.CompressionAlgorithmNone,
iggcon.Millisecond,
math.MaxUint64,
- nil)
+ nil)
itShouldNotReturnError(err)
- t2, err := client.CreateTopic(
+ t2, err := client.CreateTopic(
streamIdentifier,
t2Name,
2,
iggcon.CompressionAlgorithmNone,
iggcon.Millisecond,
math.MaxUint64,
- nil)
+ nil)
itShouldNotReturnError(err)
itShouldSuccessfullyCreateTopic(streamId, t1.Id,
t1Name, client)
itShouldSuccessfullyCreateTopic(streamId, t2.Id,
t2Name, client)
diff --git a/bdd/go/tests/tcp_test/stream_steps.go
b/bdd/go/tests/tcp_test/stream_steps.go
index 9384b0dd2..92e1812ce 100644
--- a/bdd/go/tests/tcp_test/stream_steps.go
+++ b/bdd/go/tests/tcp_test/stream_steps.go
@@ -31,7 +31,7 @@ import (
func successfullyCreateStream(prefix string, client iggycli.Client) (uint32,
string) {
name := createRandomStringWithPrefix(prefix, 128)
- stream, err := client.CreateStream(name)
+ stream, err := client.CreateStream(name)
itShouldNotReturnError(err)
itShouldSuccessfullyCreateStream(stream.Id, name, client)
diff --git a/bdd/go/tests/tcp_test/topic_steps.go
b/bdd/go/tests/tcp_test/topic_steps.go
index 488153ced..fb9bf52b9 100644
--- a/bdd/go/tests/tcp_test/topic_steps.go
+++ b/bdd/go/tests/tcp_test/topic_steps.go
@@ -34,18 +34,18 @@ func successfullyCreateTopic(streamId uint32, client
iggycli.Client) (uint32, st
replicationFactor := uint8(1)
name := createRandomString(128)
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
- topic, err := client.CreateTopic(
+ topic, err := client.CreateTopic(
streamIdentifier,
name,
2,
1,
0,
math.MaxUint64,
- &replicationFactor)
+ &replicationFactor)
- itShouldSuccessfullyCreateTopic(streamId, topic.Id, name, client)
+ itShouldSuccessfullyCreateTopic(streamId, topic.Id, name, client)
itShouldNotReturnError(err)
- return topic.Id, name
+ return topic.Id, name
}
//assertions
diff --git a/bdd/python/tests/conftest.py b/bdd/python/tests/conftest.py
index 8cf4a8f55..d8d760b43 100644
--- a/bdd/python/tests/conftest.py
+++ b/bdd/python/tests/conftest.py
@@ -18,6 +18,7 @@
"""
BDD test configuration and fixtures for Python SDK tests.
"""
+
import asyncio
import os
import pytest
@@ -62,5 +63,5 @@ def context():
if ctx.client:
try:
asyncio.run(ctx.client.disconnect())
- except:
- pass # Ignore cleanup errors
+ except Exception:
+ pass
diff --git a/bdd/python/tests/test_basic_messaging.py
b/bdd/python/tests/test_basic_messaging.py
index 21936baf6..445c02a97 100644
--- a/bdd/python/tests/test_basic_messaging.py
+++ b/bdd/python/tests/test_basic_messaging.py
@@ -18,21 +18,23 @@
"""
Basic messaging BDD test implementation for Python SDK
"""
+
import asyncio
import socket
from pytest_bdd import scenarios, given, when, then, parsers
from apache_iggy import IggyClient, SendMessage, PollingStrategy
# Load scenarios from the shared feature file
-scenarios('/app/features/basic_messaging.feature')
+scenarios("/app/features/basic_messaging.feature")
-@given('I have a running Iggy server')
+@given("I have a running Iggy server")
def running_server(context):
"""Ensure we have a running Iggy server and create client"""
+
async def _connect():
# Resolve hostname to IP if needed
- host, port = context.server_addr.split(':')
+ host, port = context.server_addr.split(":")
try:
# Try to resolve hostname to IP
ip_addr = socket.gethostbyname(host)
@@ -48,16 +50,17 @@ def running_server(context):
asyncio.run(_connect())
-@given('I am authenticated as the root user')
+@given("I am authenticated as the root user")
def authenticated_root_user(context):
"""Authenticate as root user"""
+
async def _login():
await context.client.login_user("iggy", "iggy")
asyncio.run(_login())
-@given('I have no streams in the system')
+@given("I have no streams in the system")
def no_streams_in_system(context):
"""Ensure no streams exist in the system"""
# With --fresh flag on server, this should already be clean
@@ -65,25 +68,27 @@ def no_streams_in_system(context):
pass
-@when(parsers.parse('I create a stream with name {stream_name}'))
+@when(parsers.parse("I create a stream with name {stream_name}"))
def create_stream(context, stream_name):
- """Create a stream with specified name"""
+ """Create a stream with specified name"""
+
async def _create():
await context.client.create_stream(name=stream_name)
-
+
stream = await context.client.get_stream(stream_name)
if stream is None:
raise RuntimeError(f"Stream {stream_name} was not found after
creation")
-
+
context.last_stream_id = stream.id
context.last_stream_name = stream_name
asyncio.run(_create())
-@then('the stream should be created successfully')
+@then("the stream should be created successfully")
def stream_created_successfully(context):
"""Verify stream was created successfully"""
+
async def _verify():
stream = await context.client.get_stream(context.last_stream_name)
assert stream is not None
@@ -91,9 +96,10 @@ def stream_created_successfully(context):
asyncio.run(_verify())
-@then(parsers.parse('the stream should have name {stream_name}'))
+@then(parsers.parse("the stream should have name {stream_name}"))
def verify_stream_properties(context, stream_name):
"""Verify stream has correct name"""
+
async def _verify():
stream = await context.client.get_stream(stream_name)
assert stream is not None
@@ -102,20 +108,23 @@ def verify_stream_properties(context, stream_name):
asyncio.run(_verify())
-@when(parsers.parse('I create a topic with name {topic_name} in stream
{stream_id:d} with {partitions:d} partitions'))
+@when(
+ parsers.parse(
+ "I create a topic with name {topic_name} in stream {stream_id:d} with
{partitions:d} partitions"
+ )
+)
def create_topic(context, topic_name, stream_id, partitions):
"""Create a topic with specified parameters"""
+
async def _create():
await context.client.create_topic(
- stream=stream_id,
- name=topic_name,
- partitions_count=partitions
+ stream=stream_id, name=topic_name, partitions_count=partitions
)
-
+
topic = await context.client.get_topic(stream_id, topic_name)
if topic is None:
raise RuntimeError(f"Topic {topic_name} was not found after
creation")
-
+
context.last_topic_id = topic.id
context.last_topic_name = topic_name
context.last_topic_partitions = partitions
@@ -123,19 +132,23 @@ def create_topic(context, topic_name, stream_id,
partitions):
asyncio.run(_create())
-@then('the topic should be created successfully')
+@then("the topic should be created successfully")
def topic_created_successfully(context):
"""Verify topic was created successfully"""
+
async def _verify():
- topic = await context.client.get_topic(context.last_stream_id,
context.last_topic_name)
+ topic = await context.client.get_topic(
+ context.last_stream_id, context.last_topic_name
+ )
assert topic is not None
asyncio.run(_verify())
-@then(parsers.parse('the topic should have name {topic_name}'))
+@then(parsers.parse("the topic should have name {topic_name}"))
def verify_topic_properties(context, topic_name):
"""Verify topic has correct name"""
+
async def _verify():
topic = await context.client.get_topic(context.last_stream_id,
topic_name)
assert topic is not None
@@ -144,20 +157,28 @@ def verify_topic_properties(context, topic_name):
asyncio.run(_verify())
-@then(parsers.parse('the topic should have {partitions:d} partitions'))
+@then(parsers.parse("the topic should have {partitions:d} partitions"))
def verify_topic_partitions(context, partitions):
"""Verify topic has correct number of partitions"""
+
async def _verify():
- topic = await context.client.get_topic(context.last_stream_id,
context.last_topic_name)
+ topic = await context.client.get_topic(
+ context.last_stream_id, context.last_topic_name
+ )
assert topic is not None
assert topic.partitions_count == partitions
asyncio.run(_verify())
-@when(parsers.parse('I send {message_count:d} messages to stream
{stream_id:d}, topic {topic_id:d}, partition {partition_id:d}'))
+@when(
+ parsers.parse(
+ "I send {message_count:d} messages to stream {stream_id:d}, topic
{topic_id:d}, partition {partition_id:d}"
+ )
+)
def send_messages(context, message_count, stream_id, topic_id, partition_id):
"""Send messages to specified stream, topic, and partition"""
+
async def _send():
messages = []
for i in range(message_count):
@@ -168,7 +189,7 @@ def send_messages(context, message_count, stream_id,
topic_id, partition_id):
stream=stream_id,
topic=topic_id,
partitioning=partition_id,
- messages=messages
+ messages=messages,
)
# Store the last sent message content for comparison
@@ -178,16 +199,21 @@ def send_messages(context, message_count, stream_id,
topic_id, partition_id):
asyncio.run(_send())
-@then('all messages should be sent successfully')
+@then("all messages should be sent successfully")
def messages_sent_successfully(context):
"""Verify all messages were sent successfully"""
# If we got here without exception, messages were sent successfully
assert context.last_sent_message is not None
-@when(parsers.parse('I poll messages from stream {stream_id:d}, topic
{topic_id:d}, partition {partition_id:d} starting from offset
{start_offset:d}'))
+@when(
+ parsers.parse(
+ "I poll messages from stream {stream_id:d}, topic {topic_id:d},
partition {partition_id:d} starting from offset {start_offset:d}"
+ )
+)
def poll_messages(context, stream_id, topic_id, partition_id, start_offset):
"""Poll messages from specified location"""
+
async def _poll():
context.last_polled_messages = await context.client.poll_messages(
stream=stream_id,
@@ -195,20 +221,24 @@ def poll_messages(context, stream_id, topic_id,
partition_id, start_offset):
partition_id=partition_id,
polling_strategy=PollingStrategy.Offset(value=start_offset),
count=100, # Poll up to 100 messages
- auto_commit=True
+ auto_commit=True,
)
asyncio.run(_poll())
-@then(parsers.parse('I should receive {expected_count:d} messages'))
+@then(parsers.parse("I should receive {expected_count:d} messages"))
def verify_message_count(context, expected_count):
"""Verify correct number of messages received"""
assert context.last_polled_messages is not None
assert len(context.last_polled_messages) == expected_count
-@then(parsers.parse('the messages should have sequential offsets from
{start_offset:d} to {end_offset:d}'))
+@then(
+ parsers.parse(
+ "the messages should have sequential offsets from {start_offset:d} to
{end_offset:d}"
+ )
+)
def verify_sequential_offsets(context, start_offset, end_offset):
"""Verify messages have sequential offsets"""
assert context.last_polled_messages is not None
@@ -221,24 +251,24 @@ def verify_sequential_offsets(context, start_offset,
end_offset):
assert last_message.offset() == end_offset
-@then('each message should have the expected payload content')
+@then("each message should have the expected payload content")
def verify_payload_content(context):
"""Verify each message has expected payload content"""
assert context.last_polled_messages is not None
for i, message in enumerate(context.last_polled_messages):
expected_payload = f"test message {i}"
- actual_payload = message.payload().decode('utf-8')
+ actual_payload = message.payload().decode("utf-8")
assert actual_payload == expected_payload
-@then('the last polled message should match the last sent message')
+@then("the last polled message should match the last sent message")
def verify_last_message_match(context):
"""Verify last polled message matches last sent message"""
assert context.last_sent_message is not None
assert context.last_polled_messages is not None
last_polled = context.last_polled_messages[-1]
- last_polled_payload = last_polled.payload().decode('utf-8')
+ last_polled_payload = last_polled.payload().decode("utf-8")
assert last_polled_payload == context.last_sent_message
diff --git a/examples/go/getting-started/consumer/main.go
b/examples/go/getting-started/consumer/main.go
index 1aa75f5fa..1435dae6d 100644
--- a/examples/go/getting-started/consumer/main.go
+++ b/examples/go/getting-started/consumer/main.go
@@ -29,10 +29,10 @@ import (
)
var (
- StreamID = uint32(0)
- TopicID = uint32(0)
- PartitionID = uint32(0)
- BatchesLimit = uint32(5)
+ StreamID = uint32(0)
+ TopicID = uint32(0)
+ PartitionID = uint32(0)
+ BatchesLimit = uint32(5)
)
func main() {
diff --git a/examples/go/getting-started/producer/main.go
b/examples/go/getting-started/producer/main.go
index 9c12c6f96..bbd8df08f 100644
--- a/examples/go/getting-started/producer/main.go
+++ b/examples/go/getting-started/producer/main.go
@@ -31,10 +31,10 @@ import (
)
var (
- StreamId = uint32(0)
- TopicId = uint32(0)
- PartitionId = uint32(0)
- BatchesLimit = uint32(5)
+ StreamId = uint32(0)
+ TopicId = uint32(0)
+ PartitionId = uint32(0)
+ BatchesLimit = uint32(5)
)
func main() {
@@ -57,20 +57,20 @@ func main() {
}
func initSystem(client iggycli.Client) {
- if _, err := client.CreateStream("sample-stream"); err != nil {
+ if _, err := client.CreateStream("sample-stream"); err != nil {
log.Printf("WARN: Stream already exists or error: %v", err)
}
log.Println("Stream was created.")
streamIdentifier, _ := iggcon.NewIdentifier(StreamId)
- if _, err := client.CreateTopic(
+ if _, err := client.CreateTopic(
streamIdentifier,
"sample-topic",
1,
iggcon.CompressionAlgorithmNone,
iggcon.IggyExpiryNeverExpire,
0,
- nil); err != nil {
+ nil); err != nil {
log.Printf("WARN: Topic already exists and will not be created
again or error: %v", err)
}
log.Println("Topic was created.")
diff --git a/examples/node/src/basic/producer.ts
b/examples/node/src/basic/producer.ts
index d2d21e396..e7ed52647 100644
--- a/examples/node/src/basic/producer.ts
+++ b/examples/node/src/basic/producer.ts
@@ -18,7 +18,6 @@
*/
import { Client, Partitioning } from "apache-iggy";
-import crypto from "crypto";
import debug from "debug";
import {
PARTITION_COUNT,
diff --git a/examples/node/src/getting-started/producer.ts
b/examples/node/src/getting-started/producer.ts
index ce472e4b5..c08f95c2b 100644
--- a/examples/node/src/getting-started/producer.ts
+++ b/examples/node/src/getting-started/producer.ts
@@ -18,7 +18,6 @@
*/
import { Client, Partitioning } from "apache-iggy";
-import debug from "debug";
import {
BATCHES_LIMIT,
cleanup,
diff --git a/foreign/go/benchmarks/send_messages_benchmark_test.go
b/foreign/go/benchmarks/send_messages_benchmark_test.go
index 69b39b153..85d6ebb92 100644
--- a/foreign/go/benchmarks/send_messages_benchmark_test.go
+++ b/foreign/go/benchmarks/send_messages_benchmark_test.go
@@ -105,23 +105,23 @@ func BenchmarkSendMessage(b *testing.B) {
func ensureInfrastructureIsInitialized(cli iggycli.Client, streamId uint32)
error {
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
- if _, streamErr := cli.GetStream(streamIdentifier); streamErr != nil {
- _, streamErr = cli.CreateStream("benchmark"+fmt.Sprint(streamId))
+ if _, streamErr := cli.GetStream(streamIdentifier); streamErr != nil {
+ _, streamErr = cli.CreateStream("benchmark" +
fmt.Sprint(streamId))
if streamErr != nil {
panic(streamErr)
}
}
- topicIdentifier, _ := iggcon.NewIdentifier(uint32(0))
- if _, topicErr := cli.GetTopic(streamIdentifier, topicIdentifier);
topicErr != nil {
- _, topicErr = cli.CreateTopic(
+ topicIdentifier, _ := iggcon.NewIdentifier(uint32(0))
+ if _, topicErr := cli.GetTopic(streamIdentifier, topicIdentifier);
topicErr != nil {
+ _, topicErr = cli.CreateTopic(
streamIdentifier,
"benchmark",
1,
iggcon.CompressionAlgorithmNone,
iggcon.IggyExpiryServerDefault,
1,
- nil,
+ nil,
)
if topicErr != nil {
diff --git a/foreign/go/binary_serialization/binary_request_serializer.go
b/foreign/go/binary_serialization/binary_request_serializer.go
index 89c9ea637..412aae18c 100644
--- a/foreign/go/binary_serialization/binary_request_serializer.go
+++ b/foreign/go/binary_serialization/binary_request_serializer.go
@@ -24,88 +24,88 @@ import (
)
func CreateGroup(request iggcon.CreateConsumerGroupRequest) []byte {
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
- offset := len(streamIdBytes) + len(topicIdBytes)
- bytes := make([]byte, offset+1+len(request.Name))
- copy(bytes[0:len(streamIdBytes)], streamIdBytes)
- copy(bytes[len(streamIdBytes):offset], topicIdBytes)
- bytes[offset] = byte(len(request.Name))
- copy(bytes[offset+1:], request.Name)
- return bytes
+ streamIdBytes := SerializeIdentifier(request.StreamId)
+ topicIdBytes := SerializeIdentifier(request.TopicId)
+ offset := len(streamIdBytes) + len(topicIdBytes)
+ bytes := make([]byte, offset+1+len(request.Name))
+ copy(bytes[0:len(streamIdBytes)], streamIdBytes)
+ copy(bytes[len(streamIdBytes):offset], topicIdBytes)
+ bytes[offset] = byte(len(request.Name))
+ copy(bytes[offset+1:], request.Name)
+ return bytes
}
func UpdateOffset(request iggcon.StoreConsumerOffsetRequest) []byte {
- hasPartition := byte(0)
- var partition uint32 = 0
- if request.PartitionId != nil {
- hasPartition = 1
- partition = *request.PartitionId
- }
- consumerBytes := SerializeConsumer(request.Consumer)
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
- // consumer + stream_id + topic_id + hasPartition(1) + partition(4) +
offset(8)
- bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+13)
- position := 0
- copy(bytes[position:], consumerBytes)
- position += len(consumerBytes)
- copy(bytes[position:], streamIdBytes)
- position += len(streamIdBytes)
- copy(bytes[position:], topicIdBytes)
- position += len(topicIdBytes)
- bytes[position] = hasPartition
- binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
- binary.LittleEndian.PutUint64(bytes[position+5:position+13],
uint64(request.Offset))
- return bytes
+ hasPartition := byte(0)
+ var partition uint32 = 0
+ if request.PartitionId != nil {
+ hasPartition = 1
+ partition = *request.PartitionId
+ }
+ consumerBytes := SerializeConsumer(request.Consumer)
+ streamIdBytes := SerializeIdentifier(request.StreamId)
+ topicIdBytes := SerializeIdentifier(request.TopicId)
+ // consumer + stream_id + topic_id + hasPartition(1) + partition(4) +
offset(8)
+ bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+13)
+ position := 0
+ copy(bytes[position:], consumerBytes)
+ position += len(consumerBytes)
+ copy(bytes[position:], streamIdBytes)
+ position += len(streamIdBytes)
+ copy(bytes[position:], topicIdBytes)
+ position += len(topicIdBytes)
+ bytes[position] = hasPartition
+ binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
+ binary.LittleEndian.PutUint64(bytes[position+5:position+13],
uint64(request.Offset))
+ return bytes
}
func GetOffset(request iggcon.GetConsumerOffsetRequest) []byte {
- hasPartition := byte(0)
- var partition uint32 = 0
- if request.PartitionId != nil {
- hasPartition = 1
- partition = *request.PartitionId
- }
- consumerBytes := SerializeConsumer(request.Consumer)
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
- // consumer + stream_id + topic_id + hasPartition(1) + partition(4)
- bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5)
- position := 0
- copy(bytes[position:], consumerBytes)
- position += len(consumerBytes)
- copy(bytes[position:], streamIdBytes)
- position += len(streamIdBytes)
- copy(bytes[position:], topicIdBytes)
- position += len(topicIdBytes)
- bytes[position] = hasPartition
- binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
- return bytes
+ hasPartition := byte(0)
+ var partition uint32 = 0
+ if request.PartitionId != nil {
+ hasPartition = 1
+ partition = *request.PartitionId
+ }
+ consumerBytes := SerializeConsumer(request.Consumer)
+ streamIdBytes := SerializeIdentifier(request.StreamId)
+ topicIdBytes := SerializeIdentifier(request.TopicId)
+ // consumer + stream_id + topic_id + hasPartition(1) + partition(4)
+ bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5)
+ position := 0
+ copy(bytes[position:], consumerBytes)
+ position += len(consumerBytes)
+ copy(bytes[position:], streamIdBytes)
+ position += len(streamIdBytes)
+ copy(bytes[position:], topicIdBytes)
+ position += len(topicIdBytes)
+ bytes[position] = hasPartition
+ binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
+ return bytes
}
func DeleteOffset(request iggcon.DeleteConsumerOffsetRequest) []byte {
- hasPartition := byte(0)
- var partition uint32 = 0
- if request.PartitionId != nil {
- hasPartition = 1
- partition = *request.PartitionId
- }
- consumerBytes := SerializeConsumer(request.Consumer)
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
- // consumer + stream_id + topic_id + hasPartition(1) + partition(4)
- bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5)
- position := 0
- copy(bytes[position:], consumerBytes)
- position += len(consumerBytes)
- copy(bytes[position:], streamIdBytes)
- position += len(streamIdBytes)
- copy(bytes[position:], topicIdBytes)
- position += len(topicIdBytes)
- bytes[position] = hasPartition
- binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
- return bytes
+ hasPartition := byte(0)
+ var partition uint32 = 0
+ if request.PartitionId != nil {
+ hasPartition = 1
+ partition = *request.PartitionId
+ }
+ consumerBytes := SerializeConsumer(request.Consumer)
+ streamIdBytes := SerializeIdentifier(request.StreamId)
+ topicIdBytes := SerializeIdentifier(request.TopicId)
+ // consumer + stream_id + topic_id + hasPartition(1) + partition(4)
+ bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5)
+ position := 0
+ copy(bytes[position:], consumerBytes)
+ position += len(consumerBytes)
+ copy(bytes[position:], streamIdBytes)
+ position += len(streamIdBytes)
+ copy(bytes[position:], topicIdBytes)
+ position += len(topicIdBytes)
+ bytes[position] = hasPartition
+ binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
+ return bytes
}
func CreatePartitions(request iggcon.CreatePartitionsRequest) []byte {
diff --git a/foreign/go/binary_serialization/create_stream_serializer.go
b/foreign/go/binary_serialization/create_stream_serializer.go
index cf4ba406c..be12c4b16 100644
--- a/foreign/go/binary_serialization/create_stream_serializer.go
+++ b/foreign/go/binary_serialization/create_stream_serializer.go
@@ -18,18 +18,18 @@
package binaryserialization
type TcpCreateStreamRequest struct {
- Name string
+ Name string
}
const (
- nameLengthOffset = 0
- payloadOffset = 1
+ nameLengthOffset = 0
+ payloadOffset = 1
)
func (request *TcpCreateStreamRequest) Serialize() []byte {
- nameLength := len(request.Name)
- serialized := make([]byte, payloadOffset+nameLength)
- serialized[nameLengthOffset] = byte(nameLength)
- copy(serialized[payloadOffset:], []byte(request.Name))
- return serialized
+ nameLength := len(request.Name)
+ serialized := make([]byte, payloadOffset+nameLength)
+ serialized[nameLengthOffset] = byte(nameLength)
+ copy(serialized[payloadOffset:], []byte(request.Name))
+ return serialized
}
diff --git a/foreign/go/binary_serialization/create_stream_serializer_test.go
b/foreign/go/binary_serialization/create_stream_serializer_test.go
index 6adc74021..45a520df1 100644
--- a/foreign/go/binary_serialization/create_stream_serializer_test.go
+++ b/foreign/go/binary_serialization/create_stream_serializer_test.go
@@ -25,7 +25,7 @@ import (
func TestSerialize_TcpCreateStreamRequest(t *testing.T) {
// Create a sample TcpCreateStreamRequest
request := TcpCreateStreamRequest{
- Name: "test_stream",
+ Name: "test_stream",
}
// Serialize the request
diff --git a/foreign/go/binary_serialization/create_topic_serializer.go
b/foreign/go/binary_serialization/create_topic_serializer.go
index cf60bc75c..fc5f5f9ec 100644
--- a/foreign/go/binary_serialization/create_topic_serializer.go
+++ b/foreign/go/binary_serialization/create_topic_serializer.go
@@ -40,7 +40,7 @@ func (request *TcpCreateTopicRequest) Serialize() []byte {
streamIdBytes := SerializeIdentifier(request.StreamId)
nameBytes := []byte(request.Name)
- totalLength := len(streamIdBytes) + // StreamId
+ totalLength := len(streamIdBytes) + // StreamId
4 + // PartitionsCount
1 + // CompressionAlgorithm
8 + // MessageExpiry
diff --git
a/foreign/go/binary_serialization/fetch_messages_request_serializer.go
b/foreign/go/binary_serialization/fetch_messages_request_serializer.go
index 23fd4bb29..798918e48 100644
--- a/foreign/go/binary_serialization/fetch_messages_request_serializer.go
+++ b/foreign/go/binary_serialization/fetch_messages_request_serializer.go
@@ -24,11 +24,11 @@ import (
)
const (
- partitionPresenceSize = 1
- partitionFieldSize = 4
- partitionStrategySize = partitionPresenceSize + partitionFieldSize + 1
- offsetSize = 12
- commitFlagSize = 1
+ partitionPresenceSize = 1
+ partitionFieldSize = 4
+ partitionStrategySize = partitionPresenceSize + partitionFieldSize + 1
+ offsetSize = 12
+ commitFlagSize = 1
)
type TcpFetchMessagesRequest struct {
@@ -53,18 +53,18 @@ func (request *TcpFetchMessagesRequest) Serialize() []byte {
copy(bytes[position:position+len(consumerIdBytes)], consumerIdBytes)
position += len(consumerIdBytes)
- copy(bytes[position:position+len(streamIdBytes)], streamIdBytes)
+ copy(bytes[position:position+len(streamIdBytes)], streamIdBytes)
position += len(streamIdBytes)
- copy(bytes[position:position+len(topicIdBytes)], topicIdBytes)
+ copy(bytes[position:position+len(topicIdBytes)], topicIdBytes)
position += len(topicIdBytes)
- if request.PartitionId != nil {
- bytes[position] = 1
- binary.LittleEndian.PutUint32(bytes[position+1:position+1+4],
*request.PartitionId)
- } else {
- bytes[position] = 0
- binary.LittleEndian.PutUint32(bytes[position+1:position+1+4], 0)
- }
- bytes[position+1+4] = byte(request.Strategy.Kind)
+ if request.PartitionId != nil {
+ bytes[position] = 1
+ binary.LittleEndian.PutUint32(bytes[position+1:position+1+4],
*request.PartitionId)
+ } else {
+ bytes[position] = 0
+ binary.LittleEndian.PutUint32(bytes[position+1:position+1+4], 0)
+ }
+ bytes[position+1+4] = byte(request.Strategy.Kind)
position += partitionStrategySize
binary.LittleEndian.PutUint64(bytes[position:position+8],
request.Strategy.Value)
diff --git a/foreign/go/contracts/consumer.go b/foreign/go/contracts/consumer.go
index 99bfcf2d9..84195ebba 100644
--- a/foreign/go/contracts/consumer.go
+++ b/foreign/go/contracts/consumer.go
@@ -30,7 +30,7 @@ type Consumer struct {
}
func DefaultConsumer() Consumer {
- defaultID, _ := NewIdentifier(uint32(0))
+ defaultID, _ := NewIdentifier(uint32(0))
return Consumer{
Kind: ConsumerKindSingle,
Id: defaultID,
diff --git a/foreign/go/contracts/consumer_groups.go
b/foreign/go/contracts/consumer_groups.go
index aaef317ee..5018fd037 100644
--- a/foreign/go/contracts/consumer_groups.go
+++ b/foreign/go/contracts/consumer_groups.go
@@ -36,9 +36,9 @@ type ConsumerGroupMember struct {
}
type CreateConsumerGroupRequest struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- Name string `json:"name"`
+ StreamId Identifier `json:"streamId"`
+ TopicId Identifier `json:"topicId"`
+ Name string `json:"name"`
}
type DeleteConsumerGroupRequest struct {
diff --git a/foreign/go/iggycli/client.go b/foreign/go/iggycli/client.go
index d444e2083..bc198d207 100644
--- a/foreign/go/iggycli/client.go
+++ b/foreign/go/iggycli/client.go
@@ -30,9 +30,9 @@ type Client interface {
// Authentication is required, and the permission to read the streams.
GetStreams() ([]iggcon.Stream, error)
- // CreateStream create a new stream.
- // Authentication is required, and the permission to manage the streams.
- CreateStream(name string) (*iggcon.StreamDetails, error)
+ // CreateStream create a new stream.
+ // Authentication is required, and the permission to manage the streams.
+ CreateStream(name string) (*iggcon.StreamDetails, error)
// UpdateStream update a stream by unique ID or name.
// Authentication is required, and the permission to manage the streams.
@@ -52,15 +52,15 @@ type Client interface {
// CreateTopic create a new topic.
// Authentication is required, and the permission to manage the topics.
- CreateTopic(
- streamId iggcon.Identifier,
- name string,
- partitionsCount uint32,
- compressionAlgorithm iggcon.CompressionAlgorithm,
- messageExpiry iggcon.Duration,
- maxTopicSize uint64,
- replicationFactor *uint8,
- ) (*iggcon.TopicDetails, error)
+ CreateTopic(
+ streamId iggcon.Identifier,
+ name string,
+ partitionsCount uint32,
+ compressionAlgorithm iggcon.CompressionAlgorithm,
+ messageExpiry iggcon.Duration,
+ maxTopicSize uint64,
+ replicationFactor *uint8,
+ ) (*iggcon.TopicDetails, error)
// UpdateTopic update a topic by unique ID or name.
// Authentication is required, and the permission to manage the topics.
@@ -141,11 +141,11 @@ type Client interface {
// CreateConsumerGroup create a new consumer group for the given stream
and topic by unique IDs or names.
// Authentication is required, and the permission to manage the streams
or topics.
- CreateConsumerGroup(
- streamId iggcon.Identifier,
- topicId iggcon.Identifier,
- name string,
- ) (*iggcon.ConsumerGroupDetails, error)
+ CreateConsumerGroup(
+ streamId iggcon.Identifier,
+ topicId iggcon.Identifier,
+ name string,
+ ) (*iggcon.ConsumerGroupDetails, error)
// DeleteConsumerGroup delete a consumer group by unique ID or name for
the given stream and topic by unique IDs or names.
// Authentication is required, and the permission to manage the streams
or topics.
diff --git a/foreign/go/samples/consumer/consumer.go
b/foreign/go/samples/consumer/consumer.go
index 6efbe0eaf..9ba7a58b5 100644
--- a/foreign/go/samples/consumer/consumer.go
+++ b/foreign/go/samples/consumer/consumer.go
@@ -31,11 +31,11 @@ import (
// config
const (
- DefaultStreamId = uint32(0)
- TopicId = uint32(0)
- Partition = 0
- Interval = 1000
- ConsumerId = uint32(0)
+ DefaultStreamId = uint32(0)
+ TopicId = uint32(0)
+ Partition = 0
+ Interval = 1000
+ ConsumerId = uint32(0)
)
func main() {
@@ -63,8 +63,8 @@ func main() {
func EnsureInfrastructureIsInitialized(cli iggycli.Client) error {
streamIdentifier, _ := iggcon.NewIdentifier(DefaultStreamId)
- if _, streamErr := cli.GetStream(streamIdentifier); streamErr != nil {
- _, streamErr = cli.CreateStream("Test Producer Stream")
+ if _, streamErr := cli.GetStream(streamIdentifier); streamErr != nil {
+ _, streamErr = cli.CreateStream("Test Producer Stream")
if streamErr != nil {
panic(streamErr)
@@ -76,15 +76,15 @@ func EnsureInfrastructureIsInitialized(cli iggycli.Client)
error {
fmt.Printf("Stream with ID: %d exists.\n", DefaultStreamId)
topicIdentifier, _ := iggcon.NewIdentifier(TopicId)
- if _, topicErr := cli.GetTopic(streamIdentifier, topicIdentifier);
topicErr != nil {
- _, topicErr = cli.CreateTopic(
+ if _, topicErr := cli.GetTopic(streamIdentifier, topicIdentifier);
topicErr != nil {
+ _, topicErr = cli.CreateTopic(
streamIdentifier,
"Test Topic From Producer Sample",
12,
0,
0,
0,
- nil)
+ nil)
if topicErr != nil {
panic(topicErr)
diff --git a/foreign/go/samples/producer/producer.go
b/foreign/go/samples/producer/producer.go
index 2afc5a2e0..11821525a 100644
--- a/foreign/go/samples/producer/producer.go
+++ b/foreign/go/samples/producer/producer.go
@@ -30,11 +30,11 @@ import (
)
const (
- StreamId = uint32(0)
- TopicId = uint32(0)
- MessageBatchCount = 1
- Partition = 0
- Interval = 1000
+ StreamId = uint32(0)
+ TopicId = uint32(0)
+ MessageBatchCount = 1
+ Partition = 0
+ Interval = 1000
)
func main() {
@@ -62,8 +62,8 @@ func main() {
func EnsureInfrastructureIsInitialized(cli iggycli.Client) error {
streamIdentifier, _ := iggcon.NewIdentifier(StreamId)
- if _, streamErr := cli.GetStream(streamIdentifier); streamErr != nil {
- _, streamErr = cli.CreateStream("Test Producer Stream")
+ if _, streamErr := cli.GetStream(streamIdentifier); streamErr != nil {
+ _, streamErr = cli.CreateStream("Test Producer Stream")
fmt.Println(StreamId)
@@ -77,15 +77,15 @@ func EnsureInfrastructureIsInitialized(cli iggycli.Client)
error {
fmt.Printf("Stream with ID: %d exists.\n", StreamId)
topicIdentifier, _ := iggcon.NewIdentifier(TopicId)
- if _, topicErr := cli.GetTopic(streamIdentifier, topicIdentifier);
topicErr != nil {
- _, topicErr = cli.CreateTopic(
+ if _, topicErr := cli.GetTopic(streamIdentifier, topicIdentifier);
topicErr != nil {
+ _, topicErr = cli.CreateTopic(
streamIdentifier,
"Test Topic From Producer Sample",
12,
0,
0,
0,
- nil)
+ nil)
if topicErr != nil {
panic(topicErr)
diff --git a/foreign/go/tcp/tcp_consumer_group_managament.go
b/foreign/go/tcp/tcp_consumer_group_managament.go
index e0e9debd4..bc33e659f 100644
--- a/foreign/go/tcp/tcp_consumer_group_managament.go
+++ b/foreign/go/tcp/tcp_consumer_group_managament.go
@@ -52,9 +52,9 @@ func (tms *IggyTcpClient) CreateConsumerGroup(streamId
iggcon.Identifier, topicI
return nil, ierror.ErrInvalidConsumerGroupName
}
message :=
binaryserialization.CreateGroup(iggcon.CreateConsumerGroupRequest{
- StreamId: streamId,
- TopicId: topicId,
- Name: name,
+ StreamId: streamId,
+ TopicId: topicId,
+ Name: name,
})
buffer, err := tms.sendAndFetchResponse(message, iggcon.CreateGroupCode)
if err != nil {
diff --git a/foreign/go/tcp/tcp_stream_managament.go
b/foreign/go/tcp/tcp_stream_managament.go
index 1febf5bad..d3bb7b30b 100644
--- a/foreign/go/tcp/tcp_stream_managament.go
+++ b/foreign/go/tcp/tcp_stream_managament.go
@@ -54,7 +54,7 @@ func (tms *IggyTcpClient) CreateStream(name string)
(*iggcon.StreamDetails, erro
if len(name) == 0 || MaxStringLength < len(name) {
return nil, ierror.ErrInvalidStreamName
}
- serializedRequest := binaryserialization.TcpCreateStreamRequest{Name: name}
+ serializedRequest := binaryserialization.TcpCreateStreamRequest{Name:
name}
buffer, err := tms.sendAndFetchResponse(serializedRequest.Serialize(),
iggcon.CreateStreamCode)
if err != nil {
return nil, err
diff --git a/foreign/go/tcp/tcp_topic_managament.go
b/foreign/go/tcp/tcp_topic_managament.go
index 931e0f39a..13ff22662 100644
--- a/foreign/go/tcp/tcp_topic_managament.go
+++ b/foreign/go/tcp/tcp_topic_managament.go
@@ -58,7 +58,7 @@ func (tms *IggyTcpClient) CreateTopic(
compressionAlgorithm iggcon.CompressionAlgorithm,
messageExpiry iggcon.Duration,
maxTopicSize uint64,
- replicationFactor *uint8,
+ replicationFactor *uint8,
) (*iggcon.TopicDetails, error) {
if len(name) == 0 || len(name) > MaxStringLength {
return nil, ierror.ErrInvalidTopicName
diff --git a/web/src/lib/components/Layouts/SettingsLayout.svelte
b/web/src/lib/components/Layouts/SettingsLayout.svelte
index ee7fd6912..3b0d0f45a 100644
--- a/web/src/lib/components/Layouts/SettingsLayout.svelte
+++ b/web/src/lib/components/Layouts/SettingsLayout.svelte
@@ -41,27 +41,21 @@
tab: 'server',
icon: 'adjustments',
name: 'Server',
- href: resolve(typedRoute('/dashboard/settings/server'))
+ path: typedRoute('/dashboard/settings/server')
},
{
tab: 'webUI',
icon: 'settings',
name: 'Web UI',
- href: resolve(typedRoute('/dashboard/settings/webUI'))
+ path: typedRoute('/dashboard/settings/webUI')
},
{
tab: 'users',
icon: 'usersGroup',
name: 'Users',
- href: resolve(typedRoute('/dashboard/settings/users'))
+ path: typedRoute('/dashboard/settings/users')
}
- // {
- // name: 'Terminal',
- // icon: 'terminal',
- // tab: 'terminal',
- // href: resolve(typedRoute('/dashboard/settings/terminal'))
- // }
- ] satisfies { tab: Tabs; name: string; icon: iconType; href: string }[];
+ ] satisfies { tab: Tabs; name: string; icon: iconType; path: string }[];
</script>
<div class="flex justify-between items-center px-10">
@@ -71,10 +65,10 @@
</div>
<div class="flex gap-12 border-b px-10">
- {#each tabs as { icon, name, href }, idx (idx)}
- {@const isActive = activeTab === href.split('/').slice(-1)[0]}
+ {#each tabs as { icon, name, path }, idx (idx)}
+ {@const isActive = activeTab === path.split('/').slice(-1)[0]}
<a
- {href}
+ href={resolve(path)}
class={twMerge('pb-3 relative group flex items-center justify-start
gap-2 text-color')}
>
<Icon name={icon} class="w-[15px] h-[15px]" />
diff --git a/web/src/lib/components/Modals/InspectMessage.svelte
b/web/src/lib/components/Modals/InspectMessage.svelte
index 1e58c4f2b..dc3d9aa4d 100644
--- a/web/src/lib/components/Modals/InspectMessage.svelte
+++ b/web/src/lib/components/Modals/InspectMessage.svelte
@@ -18,6 +18,7 @@
-->
<script lang="ts">
+ import { SvelteSet } from 'svelte/reactivity';
import ModalBase from './ModalBase.svelte';
import type { CloseModalFn } from '$lib/types/utilTypes';
import { type Message, type HeaderEntry, type HeaderField } from
'$lib/domain/Message';
@@ -86,7 +87,7 @@
}
};
- let expandedHeaders: Set<number> = $state(new Set());
+ let expandedHeaders = new SvelteSet<number>();
const toggleHeaderExpand = (index: number) => {
if (expandedHeaders.has(index)) {
@@ -94,7 +95,6 @@
} else {
expandedHeaders.add(index);
}
- expandedHeaders = new Set(expandedHeaders);
};
const findHeaderByStringKey = (
@@ -167,7 +167,7 @@
<span class="text-shade-l900 dark:text-shade-l700">No
headers</span>
{:else}
<div class="flex flex-col gap-1">
- {#each message.user_headers as entry, index}
+ {#each message.user_headers as entry, index (index)}
{@const keyValue = decodeHeaderValue(entry.key.kind,
entry.key.value)}
{@const valueValue = decodeHeaderValue(entry.value.kind,
entry.value.value)}
{@const isExpanded = expandedHeaders.has(index)}
diff --git a/web/src/lib/components/Navbar.svelte
b/web/src/lib/components/Navbar.svelte
index 55524b703..3e066bf6d 100644
--- a/web/src/lib/components/Navbar.svelte
+++ b/web/src/lib/components/Navbar.svelte
@@ -31,34 +31,22 @@
{
name: 'Overview',
icon: 'home',
- href: resolve(typedRoute('/dashboard/overview')),
+ path: typedRoute('/dashboard/overview'),
active: page.url.pathname.includes(typedRoute('/dashboard/overview'))
},
{
name: 'Streams',
icon: 'stream',
- href: resolve(typedRoute('/dashboard/streams')),
+ path: typedRoute('/dashboard/streams'),
active: page.url.pathname.includes(typedRoute('/dashboard/streams'))
},
- // {
- // name: 'Clients',
- // icon: 'clients',
- // href: resolve(typedRoute('/dashboard/clients')),
- // active: page.url.pathname.includes(typedRoute('/dashboard/clients'))
- // },
- // {
- // name: 'Logs',
- // icon: 'logs',
- // href: resolve(typedRoute('/dashboard/logs')),
- // active: page.url.pathname.includes(typedRoute('/dashboard/logs'))
- // },
{
name: 'Settings',
icon: 'settings',
- href: resolve(typedRoute('/dashboard/settings/webUI')),
+ path: typedRoute('/dashboard/settings/webUI'),
active: page.url.pathname.includes('/dashboard/settings')
}
- ] satisfies { name: string; icon: iconType; href: string; active: boolean
}[]);
+ ] satisfies { name: string; icon: iconType; path: string; active: boolean
}[]);
</script>
<nav
@@ -73,11 +61,11 @@
</a>
<ul class="flex flex-col gap-7">
- {#each navItems as { name, icon, href, active } (name + href)}
+ {#each navItems as { name, icon, path, active } (name + path)}
<li>
<div use:tooltip={{ placement: 'right' }}>
<a
- {href}
+ href={resolve(path)}
data-trigger
class={twMerge(
'p-2 block rounded-xl transition-colors ring-2
ring-transparent',
diff --git a/web/src/routes/auth/sign-in/+page.svelte
b/web/src/routes/auth/sign-in/+page.svelte
index 31e0aca1b..5fa96ca0d 100644
--- a/web/src/routes/auth/sign-in/+page.svelte
+++ b/web/src/routes/auth/sign-in/+page.svelte
@@ -90,7 +90,7 @@
authStore.login(access_token.token, access_token.expiry);
goto(resolve(typedRoute('/dashboard/overview')));
- } catch (e) {
+ } catch {
errorMessage = 'Failed to connect to server';
isLoading = false;
}
diff --git
a/web/src/routes/dashboard/streams/[streamId=i32]/topics/[topicId=i32]/partitions/[partitionId=i32]/messages/+page.svelte
b/web/src/routes/dashboard/streams/[streamId=i32]/topics/[topicId=i32]/partitions/[partitionId=i32]/messages/+page.svelte
index 2355bc7bd..a4638196d 100644
---
a/web/src/routes/dashboard/streams/[streamId=i32]/topics/[topicId=i32]/partitions/[partitionId=i32]/messages/+page.svelte
+++
b/web/src/routes/dashboard/streams/[streamId=i32]/topics/[topicId=i32]/partitions/[partitionId=i32]/messages/+page.svelte
@@ -67,7 +67,7 @@
// TODO: https://github.com/sveltejs/kit/issues/14750
await goto(
- // eslint-disable-next-line svelte/no-navigation-without-resolve
+ // eslint-disable-next-line svelte/no-navigation-without-resolve
resolve(
`/dashboard/streams/${page.params.streamId}/topics/${page.params.topicId}/partitions/${page.params.partitionId}/messages`
) + `?${searchParams}`,
@@ -95,9 +95,7 @@
variant="rounded"
class="mr-5"
onclick={() =>
- goto(
-
resolve(`/dashboard/streams/${page.params.streamId}/topics/${page.params.topicId}`)
- )}
+
goto(resolve(`/dashboard/streams/${page.params.streamId}/topics/${page.params.topicId}`))}
>
<Icon name="arrowLeft" class="h-[40px] w-[30px]" />
</Button>